-
Notifications
You must be signed in to change notification settings - Fork 536
SLURMGraph plugin #1136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
SLURMGraph plugin #1136
Changes from 9 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
92635c0
Inintial work on SLURMGraph plugin
chrisgorgo 5168012
fixed copypaste bug
chrisgorgo bb2f6ed
remove SGE remnants
chrisgorgo c3e6f02
fixed caching
chrisgorgo 1db8ee6
fixed template
chrisgorgo b68ee6d
fixed whitespace in the dependencies
chrisgorgo 5fb4b94
space!
chrisgorgo 9c83152
Improved docs.
chrisgorgo cfb6fc6
updated changelog
chrisgorgo 87673aa
Merge branch 'master' into enh/slurmgraph
chrisgorgo 73cb598
Remove run_without_submitting code
chrisgorgo 4486775
calling non existing variable
chrisgorgo 9cfb703
extra condition ported from SLURMGraph
chrisgorgo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,158 @@ | ||
| """Parallel workflow execution via SLURM | ||
| """ | ||
|
|
||
| import os | ||
| import sys | ||
|
|
||
| from .base import (GraphPluginBase, logger) | ||
|
|
||
| from ...interfaces.base import CommandLine | ||
|
|
||
|
|
||
| def node_completed_status( checknode): | ||
| """ | ||
| A function to determine if a node has previously completed it's work | ||
| :param checknode: The node to check the run status | ||
| :return: boolean value True indicates that the node does not need to be run. | ||
| """ | ||
| """ TODO: place this in the base.py file and refactor """ | ||
| node_state_does_not_require_overwrite = ( checknode.overwrite == False or | ||
| (checknode.overwrite == None and | ||
| not checknode._interface.always_run ) | ||
| ) | ||
| hash_exists = False | ||
| try: | ||
| hash_exists, _, _, _ = checknode.hash_exists() | ||
| except Exception: | ||
| hash_exists = False | ||
| return (hash_exists and node_state_does_not_require_overwrite ) | ||
|
|
||
|
|
||
| class SLURMGraphPlugin(GraphPluginBase): | ||
| """Execute using SLURM | ||
|
|
||
| The plugin_args input to run can be used to control the SGE execution. | ||
| Currently supported options are: | ||
|
|
||
| - template : template to use for batch job submission | ||
| - qsub_args : arguments to be prepended to the job execution script in the | ||
| qsub call | ||
|
|
||
| """ | ||
| _template="#!/bin/bash" | ||
|
|
||
| def __init__(self, **kwargs): | ||
| if 'plugin_args' in kwargs and kwargs['plugin_args']: | ||
| if 'retry_timeout' in kwargs['plugin_args']: | ||
| self._retry_timeout = kwargs['plugin_args']['retry_timeout'] | ||
| if 'max_tries' in kwargs['plugin_args']: | ||
| self._max_tries = kwargs['plugin_args']['max_tries'] | ||
| if 'template' in kwargs['plugin_args']: | ||
| self._template = kwargs['plugin_args']['template'] | ||
| if os.path.isfile(self._template): | ||
| self._template = open(self._template).read() | ||
| if 'sbatch_args' in kwargs['plugin_args']: | ||
| self._sbatch_args = kwargs['plugin_args']['sbatch_args'] | ||
| if 'dont_resubmit_completed_jobs' in kwargs['plugin_args']: | ||
| self._dont_resubmit_completed_jobs = plugin_args['dont_resubmit_completed_jobs'] | ||
| else: | ||
| self._dont_resubmit_completed_jobs = False | ||
| super(SLURMGraphPlugin, self).__init__(**kwargs) | ||
|
|
||
| def _submit_graph(self, pyfiles, dependencies, nodes): | ||
| def make_job_name(jobnumber, nodeslist): | ||
| """ | ||
| - jobnumber: The index number of the job to create | ||
| - nodeslist: The name of the node being processed | ||
| - return: A string representing this job to be displayed by SLURM | ||
| """ | ||
| job_name='j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id) | ||
| # Condition job_name to be a valid bash identifier (i.e. - is invalid) | ||
| job_name=job_name.replace('-','_').replace('.','_').replace(':','_') | ||
| return job_name | ||
| batch_dir, _ = os.path.split(pyfiles[0]) | ||
| submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') | ||
|
|
||
| cache_doneness_per_node = dict() | ||
| if self._dont_resubmit_completed_jobs: ## A future parameter for controlling this behavior could be added here | ||
| for idx, pyscript in enumerate(pyfiles): | ||
| node = nodes[idx] | ||
| node_status_done = node_completed_status(node) | ||
| ## If a node has no dependencies, and it is requested to run_without_submitting | ||
| ## then run this node in place | ||
| if (not node_status_done) and (len(dependencies[idx]) == 0 ) and (node.run_without_submitting == True): | ||
| try: | ||
| node.run() | ||
| except Exception: | ||
| node._clean_queue(idx, nodes) | ||
| node_status_done = True # if successfully run locally, then claim true | ||
|
|
||
| #if the node itself claims done, then check to ensure all | ||
| #dependancies are also done | ||
| if node_status_done and idx in dependencies: | ||
| for child_idx in dependencies[idx]: | ||
| if child_idx in cache_doneness_per_node: | ||
| child_status_done = cache_doneness_per_node[child_idx] | ||
| else: | ||
| child_status_done = node_completed_status(nodes[child_idx]) | ||
| node_status_done = node_status_done and child_status_done | ||
|
|
||
| cache_doneness_per_node[idx] = node_status_done | ||
|
|
||
| with open(submitjobsfile, 'wt') as fp: | ||
| fp.writelines('#!/usr/bin/env bash\n') | ||
| fp.writelines('# Condense format attempted\n') | ||
| for idx, pyscript in enumerate(pyfiles): | ||
| node = nodes[idx] | ||
| if cache_doneness_per_node.get(idx,False): | ||
| continue | ||
| else: | ||
| template, sbatch_args = self._get_args( | ||
| node, ["template", "sbatch_args"]) | ||
|
|
||
| batch_dir, name = os.path.split(pyscript) | ||
| name = '.'.join(name.split('.')[:-1]) | ||
| batchscript = '\n'.join((template, | ||
| '%s %s' % (sys.executable, pyscript))) | ||
| batchscriptfile = os.path.join(batch_dir, | ||
| 'batchscript_%s.sh' % name) | ||
|
|
||
| batchscriptoutfile = batchscriptfile + '.o' | ||
| batchscripterrfile = batchscriptfile + '.e' | ||
|
|
||
| with open(batchscriptfile, 'wt') as batchfp: | ||
| batchfp.writelines(batchscript) | ||
| batchfp.close() | ||
| deps = '' | ||
| if idx in dependencies: | ||
| values = '' | ||
| for jobid in dependencies[idx]: | ||
| ## Avoid dependancies of done jobs | ||
| if not self._dont_resubmit_completed_jobs or cache_doneness_per_node[jobid] == False: | ||
| values += "${{{0}}}:".format(make_job_name(jobid, nodes)) | ||
| if values != '': # i.e. if some jobs were added to dependency list | ||
| values = values.rstrip(':') | ||
| deps = '--dependency=afterok:%s' % values | ||
| jobname = make_job_name(idx, nodes) | ||
| # Do not use default output locations if they are set in self._sbatch_args | ||
| stderrFile = '' | ||
| if self._sbatch_args.count('-e ') == 0: | ||
| stderrFile = '-e {errFile}'.format( | ||
| errFile=batchscripterrfile) | ||
| stdoutFile = '' | ||
| if self._sbatch_args.count('-o ') == 0: | ||
| stdoutFile = '-o {outFile}'.format( | ||
| outFile=batchscriptoutfile) | ||
| full_line = '{jobNm}=$(sbatch {outFileOption} {errFileOption} {extraSBatchArgs} {dependantIndex} -J {jobNm} {batchscript} | awk \'{{print $4}}\')\n'.format( | ||
| jobNm=jobname, | ||
| outFileOption=stdoutFile, | ||
| errFileOption=stderrFile, | ||
| extraSBatchArgs=sbatch_args, | ||
| dependantIndex=deps, | ||
| batchscript=batchscriptfile) | ||
| fp.writelines(full_line) | ||
| cmd = CommandLine('bash', environ=os.environ.data, | ||
| terminal_output='allatonce') | ||
| cmd.inputs.args = '%s' % submitjobsfile | ||
| cmd.run() | ||
| logger.info('submitted all jobs to queue') | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run_without_submitting shouldn't apply to a graph plugin - that option is to ensure nodes can run on the monitoring/submitting node. in the graph plugin case, since the scheduler takes over, this check would be unnecessary. i know this was taken from the SGEgraph code (and i didn't review it closely enough :( ).