Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 97 additions & 43 deletions nipype/pipeline/plugins/sgegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@
from ...interfaces.base import CommandLine


def node_completed_status( checknode):
"""
A function to determine if a node has previously completed it's work
:param checknode: The node to check the run status
:return: boolean value True indicates that the node does not need to be run.
"""
""" TODO: place this in the base.py file and refactor """
node_state_does_not_require_overwrite = ( checknode.overwrite == False or
(checknode.overwrite == None and
not checknode._interface.always_run )
)
hash_exists = False
try:
hash_exists, _, _, _ = checknode.hash_exists()
except Exception:
hash_exists = False
return (hash_exists and node_state_does_not_require_overwrite )


class SGEGraphPlugin(GraphPluginBase):
"""Execute using SGE

Expand Down Expand Up @@ -45,56 +64,91 @@ def make_job_name(jobnumber, nodeslist):
- nodeslist: The name of the node being processed
- return: A string representing this job to be displayed by SGE
"""
return 'j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
job_name='j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
# Condition job_name to be a valid bash identifier (i.e. - is invalid)
job_name=job_name.replace('-','_').replace('.','_').replace(':','_')
return job_name
batch_dir, _ = os.path.split(pyfiles[0])
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')

cache_doneness_per_node = dict()
if True: ## A future parameter for controlling this behavior could be added here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we can add the config option.

for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
node_status_done = node_completed_status(node)
## If a node has no dependencies, and it is requested to run_without_submitting
## then run this node in place
if (not node_status_done) and (len(dependencies[idx]) == 0 ) and (node.run_without_submitting == True):
try:
node.run()
except Exception:
node._clean_queue(idx, nodes)
node_status_done = True # if successfully run locally, then claim true

#if the node itself claims done, then check to ensure all
#dependancies are also done
if node_status_done and idx in dependencies:
for child_idx in dependencies[idx]:
if child_idx in cache_doneness_per_node:
child_status_done = cache_doneness_per_node[child_idx]
else:
child_status_done = node_completed_status(nodes[child_idx])
node_status_done = node_status_done and child_status_done

cache_doneness_per_node[idx] = node_status_done

with open(submitjobsfile, 'wt') as fp:
fp.writelines('#!/usr/bin/env bash\n')
fp.writelines('# Condense format attempted\n')
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
template, qsub_args = self._get_args(
node, ["template", "qsub_args"])

batch_dir, name = os.path.split(pyscript)
name = '.'.join(name.split('.')[:-1])
batchscript = '\n'.join((template,
'%s %s' % (sys.executable, pyscript)))
batchscriptfile = os.path.join(batch_dir,
'batchscript_%s.sh' % name)

batchscriptoutfile = batchscriptfile + '.o'
batchscripterrfile = batchscriptfile + '.e'

with open(batchscriptfile, 'wt') as batchfp:
batchfp.writelines(batchscript)
batchfp.close()
deps = ''
if idx in dependencies:
values = ' '
for jobid in dependencies[idx]:
values += make_job_name(jobid, nodes)
if values != ' ': # i.e. if some jobs were added to dependency list
values = values.rstrip(',')
deps = '-hold_jid%s' % values
jobname = make_job_name(idx, nodes)
# Do not use default output locations if they are set in self._qsub_args
stderrFile = ''
if self._qsub_args.count('-e ') == 0:
stderrFile = '-e {errFile}'.format(
errFile=batchscripterrfile)
stdoutFile = ''
if self._qsub_args.count('-o ') == 0:
stdoutFile = '-o {outFile}'.format(
outFile=batchscriptoutfile)
full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript} | awk \'{{print $3}}\')\n'.format(
jobNm=jobname,
outFileOption=stdoutFile,
errFileOption=stderrFile,
extraQSubArgs=qsub_args,
dependantIndex=deps,
batchscript=batchscriptfile)
fp.writelines(full_line)
if cache_doneness_per_node.get(idx,False):
continue
else:
template, qsub_args = self._get_args(
node, ["template", "qsub_args"])

batch_dir, name = os.path.split(pyscript)
name = '.'.join(name.split('.')[:-1])
batchscript = '\n'.join((template,
'%s %s' % (sys.executable, pyscript)))
batchscriptfile = os.path.join(batch_dir,
'batchscript_%s.sh' % name)

batchscriptoutfile = batchscriptfile + '.o'
batchscripterrfile = batchscriptfile + '.e'

with open(batchscriptfile, 'wt') as batchfp:
batchfp.writelines(batchscript)
batchfp.close()
deps = ''
if idx in dependencies:
values = ' '
for jobid in dependencies[idx]:
## Avoid dependancies of done jobs
if cache_doneness_per_node[jobid] == False:
values += "${{{0}}},".format(make_job_name(jobid, nodes))
if values != ' ': # i.e. if some jobs were added to dependency list
values = values.rstrip(',')
deps = '-hold_jid%s' % values
jobname = make_job_name(idx, nodes)
# Do not use default output locations if they are set in self._qsub_args
stderrFile = ''
if self._qsub_args.count('-e ') == 0:
stderrFile = '-e {errFile}'.format(
errFile=batchscripterrfile)
stdoutFile = ''
if self._qsub_args.count('-o ') == 0:
stdoutFile = '-o {outFile}'.format(
outFile=batchscriptoutfile)
full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript} | awk \'{{print $3}}\')\n'.format(
jobNm=jobname,
outFileOption=stdoutFile,
errFileOption=stderrFile,
extraQSubArgs=qsub_args,
dependantIndex=deps,
batchscript=batchscriptfile)
fp.writelines(full_line)
cmd = CommandLine('bash', environ=os.environ.data,
terminal_output='allatonce')
cmd.inputs.args = '%s' % submitjobsfile
Expand Down