Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import nextflow.conda.CondaConfig
import nextflow.config.Manifest
import nextflow.container.ContainerConfig
import nextflow.dag.DAG
import nextflow.dag.TaskGraph
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortSignalException
import nextflow.exception.IllegalConfigException
Expand Down Expand Up @@ -193,6 +194,8 @@ class Session implements ISession {

private DAG dag

private TaskGraph taskGraph

private CacheDB cache

private Barrier processesBarrier = new Barrier()
Expand Down Expand Up @@ -345,6 +348,7 @@ class Session implements ISession {

// -- DAG object
this.dag = new DAG()
this.taskGraph = new TaskGraph()

// -- init work dir
this.workDir = ((config.workDir ?: 'work') as Path).complete()
Expand Down Expand Up @@ -800,6 +804,8 @@ class Session implements ISession {

DAG getDag() { this.dag }

TaskGraph getTaskGraph() { this.taskGraph }

ExecutorService getExecService() { execService }

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.nio.file.Path
class CytoscapeHtmlRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
void renderProcessGraph(DAG dag, Path file) {
String tmplPage = readTemplate()
String network = CytoscapeJsRenderer.renderNetwork(dag)
file.text = tmplPage.replaceAll(~/\/\* REPLACE_WITH_NETWORK_DATA \*\//, network)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.nio.file.Path
class CytoscapeJsRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
void renderProcessGraph(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

Expand Down
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import nextflow.script.params.OutputsList
import nextflow.script.params.TupleInParam
import nextflow.script.params.TupleOutParam
/**
* Model a direct acyclic graph of the pipeline execution.
* Model the process graph of a pipeline execution.
*
* @author Paolo Di Tommaso <[email protected]>
*/
Expand Down
15 changes: 12 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,19 @@ import java.nio.file.Path
* @author Paolo Di Tommaso <[email protected]>
* @author Mike Smoot <[email protected]>
*/
interface DagRenderer {
trait DagRenderer {

/**
* Render the dag to the specified file.
* Render a process DAG.
*/
void renderDocument(DAG dag, Path file);
void renderProcessGraph(DAG dag, Path file) {
throw new UnsupportedOperationException("Process graph rendering is not supported for this file format")
}

/**
* Render a task DAG.
*/
void renderTaskGraph(TaskGraph dag, Path file) {
throw new UnsupportedOperationException("Task graph rendering is not supported for this file format")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DotRenderer implements DagRenderer {
static String normalise(String str) { str.replaceAll(/[^0-9_A-Za-z]/,'') }

@Override
void renderDocument(DAG dag, Path file) {
void renderProcessGraph(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GexfRenderer implements DagRenderer {
}

@Override
void renderDocument(DAG dag, Path file) {
void renderProcessGraph(DAG dag, Path file) {
final Charset charset = Charset.defaultCharset()
Writer bw = Files.newBufferedWriter(file, charset)
final XMLOutputFactory xof = XMLOutputFactory.newFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GraphvizRenderer implements DagRenderer {
* See http://www.graphviz.org for more info.
*/
@Override
void renderDocument(DAG dag, Path target) {
void renderProcessGraph(DAG dag, Path target) {
def result = Files.createTempFile('nxf-',".$format")
def temp = Files.createTempFile('nxf-','.dot')
// save the DAG as `dot` to a temp file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ import java.nio.file.Path
class MermaidRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

String renderNetwork(DAG dag) {
void renderProcessGraph(DAG dag, Path file) {
def lines = []
lines << "flowchart TD"

Expand All @@ -45,7 +41,7 @@ class MermaidRenderer implements DagRenderer {

lines << ""

return lines.join('\n')
file.text = lines.join('\n')
}

private String renderVertex(DAG.Vertex vertex) {
Expand Down Expand Up @@ -76,4 +72,25 @@ class MermaidRenderer implements DagRenderer {

return "${edge.from.name} -->${label} ${edge.to.name}"
}

@Override
void renderTaskGraph(TaskGraph graph, Path file) {
def lines = []
lines << "flowchart TD"

graph.nodes.values().each { node ->
lines << " ${node.getSlug()}[\"${node.label}\"]"

node.predecessors.each { key ->
final pred = graph.nodes[key]

if( pred )
lines << " ${pred.getSlug()} --> ${node.getSlug()}"
}
}

lines << ""

file.text = lines.join('\n')
}
}
14 changes: 13 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import groovyx.gpars.dataflow.operator.DataflowProcessor
import nextflow.Global
import nextflow.Session
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.script.params.InputsList
import nextflow.script.params.OutputsList
/**
Expand All @@ -41,7 +42,7 @@ class NodeMarker {
static private Session getSession() { Global.session as Session }

/**
* Creates a new vertex in the DAG representing a computing `process`
* Creates a new vertex in the DAG representing a computing `process`
*
* @param label The label associated to the process
* @param inputs The list of inputs entering in the process
Expand Down Expand Up @@ -88,4 +89,15 @@ class NodeMarker {
session.dag.addDataflowBroadcastPair(readChannel, broadcastChannel)
}

/**
* Creates a new node in the DAG representing a task
*
* @param task
* @param hash
*/
static void addTaskNode( TaskRun task, String hash ) {
if( session?.taskGraph && !session.aborted )
session.taskGraph.addTaskNode( task, hash )
}

}
77 changes: 77 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/dag/TaskGraph.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2020-2022, Seqera Labs
* Copyright 2013-2019, Centre for Genomic Regulation (CRG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import java.nio.file.Path
import java.util.regex.Pattern

import groovy.transform.MapConstructor
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.processor.TaskRun
/**
* Model the task graph of a pipeline execution.
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
class TaskGraph {

Map<String,Node> nodes = new HashMap<>(100)

/**
* Create a new node for a task
*
* @param task
* @param hash
*/
synchronized void addTaskNode( TaskRun task, String hash ) {
final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}"
final preds = task.getInputFilesMap().values()
.collect { p -> getPredecessorHash(p) }
.findAll { h -> h != null }

nodes[hash] = new Node(
index: nodes.size(),
label: label,
predecessors: preds
)
}

protected String getPredecessorHash(Path path) {
final pattern = Pattern.compile('.*/([a-z0-9]{2}/[a-z0-9]{30})')
final matcher = pattern.matcher(path.toString())

matcher.find() ? matcher.group(1).replace('/', '') : null
}

@MapConstructor
@ToString(includeNames = true, includes = 'label', includePackage=false)
protected class Node {

int index

String label

List<String> predecessors

String getSlug() { "t${index}" }

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ class TaskProcessor {
def invoke = new InvokeTaskAdapter(this, opInputs.size())
session.allOperators << (operator = new DataflowOperator(group, params, invoke))

// notify the creation of a new vertex the execution DAG
// notify the creation of a new process in the DAG
NodeMarker.addProcessNode(this, config.getInputs(), config.getOutputs())

// fix issue #41
Expand Down Expand Up @@ -756,8 +756,11 @@ class TaskProcessor {

log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry"
def cached = shouldTryCache && exists && checkCachedOutput(task.clone(), resumeDir, hash, entry)
if( cached )
if( cached ) {
// add cached task to the task graph
NodeMarker.addTaskNode(task, hash.toString())
break
}
}
catch (Throwable t) {
log.warn1("[${safeTaskName(task)}] Unable to resume cached task -- See log file for details", causedBy: t)
Expand All @@ -780,6 +783,9 @@ class TaskProcessor {
lock.release()
}

// add submitted task to the task graph
NodeMarker.addTaskNode(task, hash.toString())

// submit task for execution
submitTask( task, hash, workDir )
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class DefaultObserverFactory implements TraceObserverFactory {
if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new GraphObserver(traceFile)
config.navigate('dag.type') { observer.graphType = it ?: 'process' }
config.navigate('dag.overwrite') { observer.overwrite = it }
result << observer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.dag.CytoscapeHtmlRenderer
import nextflow.dag.DAG
import nextflow.dag.TaskGraph
import nextflow.dag.DagRenderer
import nextflow.dag.DotRenderer
import nextflow.dag.GexfRenderer
Expand All @@ -46,7 +47,11 @@ class GraphObserver implements TraceObserver {

private Path file

private DAG dag
private String graphType

private DAG processGraph

private TaskGraph taskGraph

private String name

Expand All @@ -67,7 +72,8 @@ class GraphObserver implements TraceObserver {

@Override
void onFlowCreate(Session session) {
this.dag = session.dag
this.processGraph = session.dag
this.taskGraph = session.taskGraph
// check file existance
final attrs = FileHelper.readAttributes(file)
if( attrs ) {
Expand All @@ -80,14 +86,22 @@ class GraphObserver implements TraceObserver {

@Override
void onFlowComplete() {
// -- normalise the DAG
dag.normalize()
// -- render it to a file
createRender().renderDocument(dag,file)
if( graphType == 'process' ) {
// -- normalise the DAG
processGraph.normalize()
// -- render it to a file
createRenderer().renderProcessGraph(processGraph,file)
}
else if( graphType == 'task' ) {
createRenderer().renderTaskGraph(taskGraph,file)
}
else {
log.warn("Invalid DAG type '${graphType}'")
}
}

@PackageScope
DagRenderer createRender() {
DagRenderer createRenderer() {
if( format == 'dot' )
new DotRenderer(name)

Expand All @@ -104,28 +118,6 @@ class GraphObserver implements TraceObserver {
new GraphvizRenderer(name, format)
}


@Override
void onProcessCreate(TaskProcessor process) {

}


@Override
void onProcessSubmit(TaskHandler handler, TraceRecord trace) {

}

@Override
void onProcessStart(TaskHandler handler, TraceRecord trace) {

}

@Override
void onProcessComplete(TaskHandler handler, TraceRecord trace) {

}

@Override
boolean enableMetrics() {
return false
Expand Down
Loading