Skip to content
28 changes: 24 additions & 4 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import print_function, division, unicode_literals, absolute_import

# Import packages
import os
from multiprocessing import Process, Pool, cpu_count, pool
from traceback import format_exception
import sys
Expand Down Expand Up @@ -47,6 +48,8 @@ def run_node(node, updatehash, taskid):
the node to run
updatehash : boolean
flag for updating hash
taskid : int
an identifier for this task

Returns
-------
Expand All @@ -68,6 +71,10 @@ def run_node(node, updatehash, taskid):
return result


def _init_worker(cwd):
os.chdir(cwd)


class NonDaemonProcess(Process):
"""A non-daemon process to support internal multiprocessing.
"""
Expand Down Expand Up @@ -128,6 +135,10 @@ def __init__(self, plugin_args=None):
self._task_obj = {}
self._taskid = 0

# Cache current working directory and make sure we
# change to it when workers are set up
self._cwd = os.getcwd()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the workflow base directory or the CWD of the shell? Could this cause things to dump into the local directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't because interfaces handle their WD. I think this is fixing an edge case for fmriprep where we are spinning up and killing workers all the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok it turns out that some SimpleInterfaces are writing to the workflow base directory :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that's... suboptimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually bothering, because it means that without this patch, those interfaces are being run in some other unexpected path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently it is a problem only with SimpleInterface


# Read in options or set defaults.
non_daemon = self.plugin_args.get('non_daemon', True)
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
Expand All @@ -140,19 +151,28 @@ def __init__(self, plugin_args=None):

# Instantiate different thread pools for non-daemon processes
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
self.memory_gb)
'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon),
self.processors, self.memory_gb, self._cwd)

NipypePool = NonDaemonPool if non_daemon else Pool
try:
self.pool = NipypePool(processes=self.processors,
maxtasksperchild=maxtasks)
self.pool = NipypePool(
processes=self.processors,
maxtasksperchild=maxtasks,
initializer=_init_worker,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this shouldn't just be os.chdir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really

initargs=(self._cwd,)
)
except TypeError:
# Python < 3.2 does not have maxtasksperchild
# When maxtasksperchild is not set, initializer is not to be
# called.
self.pool = NipypePool(processes=self.processors)

self._stats = None

def _async_callback(self, args):
# Make sure runtime is not left at a dubious working directory
os.chdir(self._cwd)
self._taskresult[args['taskid']] = args

def _get_result(self, taskid):
Expand Down