Nextflow patterns
1 Basic Patterns
1.1 Channel duplication
P:需要在两个或多个进程中使用相同的通道作为输入
S:使用into运算符创建源通道的两个(或更多)副本。然后,使用新通道作为流程的输入。
代码:
Channel
.fromPath('prots/*_?.fa')
.into { prot1_ch; prot2_ch }
process foo {
input: file x from prot1_ch
script:
"""
echo your_command --input $x
"""
}
process bar {
input: file x from prot2_ch
script:
"""
your_command --input $x
"""
}
2 Scatter executions
2.1 Process per file path
P:需要为每个匹配 glob 模式的文件执行一个任务
S:使用Channel.fromPath方法创建一个通道,发出与 glob 模式匹配的所有文件。然后,使用通道作为执行任务的流程的输入。
代码:
Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }
process foo {
input:
file x from samples_ch
script:
"""
your_command --input $x
"""
}
2.2 Process per file chunk
P:需要将一个或多个输入文件拆分为块并为每个文件执行一项任务
S:使用splitText运算符将文件拆分为给定大小的块。然后将结果通道用作执行任务的流程的输入
代码:
Channel
.fromPath('poem.txt')
.splitText(by: 5)
.set{ chunks_ch }
process foo {
echo true
input:
file x from chunks_ch
script:
"""
rev $x | rev
"""
}
2.3 Process per file pairs
P:需要将文件处理到按对分组的目录中
S:使用Channel.fromFilePairs方法创建一个通道,该通道发出与 glob 模式匹配的文件对。该模式必须匹配成对文件名中的公共前缀。匹配文件作为元组发出,其中第一个元素是匹配文件的分组键,第二个元素是文件对本身。
代码:
Channel
.fromFilePairs('reads/*_{1,2}.fq.gz')
.set { samples_ch }
process foo {
input:
set sampleId, file(reads) from samples_ch
script:
"""
your_command --sample $sampleId --reads $reads
"""
}
- 自定义分组策略
需要时,可以定义自定义分组策略。一个常见的用例是对齐 BAM 文件 ( sample1.bam
) 随附的索引文件。困难在于索引有时会被调用sample1.bai
,有时sample1.bam.bai
取决于所使用的软件。下面的例子可以适应这两种情况。
代码:
Channel
.fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }
.set { samples_ch }
process foo {
input:
set sampleId, file(bam) from samples_ch
script:
"""
your_command --sample $sampleId --bam ${sampleId}.bam
"""
}
2.4 Process per file range
P:需要在具有共同索引范围的两个或更多系列文件上执行任务
S:使用from方法定义重复执行任务的范围,然后将其与map运算符链接以将每个索引与相应的输入文件相关联。最后使用结果通道作为过程的输入
代码:
Channel
.from(1..23)
.map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }
.set { pairs_ch }
process foo {
tag "$sampleId"
input:
set sampleId, file(indels), file(snps) from pairs_ch
"""
echo foo_command --this $indels --that $snps
"""
}
2.5 Process per CSV record
P:需要为一个或多个 CSV 文件中的每条记录执行一项任务
S:使用splitCsv运算符逐行读取 CSV 文件,然后使用map运算符返回每行所需字段的元组,并使用该file
函数将任何字符串路径转换为文件路径对象。最后使用结果通道作为过程的输入
index.csv
sampleId | read 1 | read2 |
---|---|---|
FC816RLABXX | read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gz | read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz |
FC812MWABXX | read/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gz | read110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz |
FC81DE8ABXX | read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gz | read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz |
FC81DB5ABXX | read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gz | read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz |
FC819P0ABXX | read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gz | read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz |
代码:
params.index = 'index.csv'
Channel
.fromPath(params.index)
.splitCsv(header:true)
.map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
.set { samples_ch }
process foo {
input:
set sampleId, file(read1), file(read2) from samples_ch
script:
"""
echo your_command --sample $sampleId --reads $read1 $read2
"""
}
2.6 Process per file output
P:工作流中的任务一次生成两个或更多文件。下游任务需要独立处理这些文件中的每一个
S:使用flatten运算符将上游进程的输出转换为单独发送到每个文件的通道。然后将此通道用作下游过程的输入
代码:
process foo {
output:
file '*.txt' into foo_ch
script:
'''
echo Hello there! > file1.txt
echo What a beautiful day > file2.txt
echo I wish you are having fun1 > file3.txt
'''
}
process bar {
input:
file x from foo_ch.flatten()
script:
"""
cat $x
"""
}
3 Gather results
3.1 Process all outputs altogether
P:需要完全处理上游任务的所有输出
S:使用collect运算符收集上游任务产生的所有输出,并将它们作为唯一输出发出。然后使用结果通道作为过程的输入输入
代码:
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }
process foo {
input:
file x from samples_ch
output:
file 'file.fq' into unzipped_ch
script:
"""
< $x zcat > file.fq
"""
}
process bar {
echo true
input:
file '*.fq' from unzipped_ch.collect()
"""
cat *.fq
"""
}
3.2 Process outputs into groups
P:需要在同一批次中处理文件名中具有匹配键的所有文件
S:使用map运算符将每个文件关联一个从文件名中提取的键。然后将结果通道与groupTuple运算符链接起来,将所有具有匹配键的文件组合在一起。最后使用结果通道作为过程的输入。
代码:
Channel
.fromPath('reads/*')
.map { file ->
def key = file.name.toString().tokenize('_').get(0)
return tuple(key, file)
}
.groupTuple()
.set{ groups_ch }
process foo {
input:
set key, file(samples) from groups_ch
script:
"""
echo your_command --batch $key --input $samples
"""
}
3.3 Collect outputs into a file
P:需要将上游进程生成的所有输出文件连接到一个文件中
S:使用collectFile运算符将所有输出文件合并为一个文件
代码:
Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }
process foo {
input:
file x from samples_ch
output:
file 'file.fq' into unzipped_ch
script:
"""
< $x zcat > file.fq
"""
}
unzipped_ch
.collectFile()
.println{ it.text }
4 Organize outputs
4.1 Store process outputs
P:需要将一个或多个进程的输出存储到您选择的目录结构中
S:使用publishDir指令设置一个自定义目录,在该目录中需要提供流程输出
代码:
params.reads = 'reads/*{1,2}.fq.gz'
params.outdir = 'my-results'
Channel.fromFilePairs(params.reads).set{ samples_ch }
process foo {
publishDir "$params.outdir/$sampleId"
input:
set sampleId, file(samples) from samples_ch
output:
file '*.fq'
script:
"""
< ${samples[0]} zcat > sample_1.fq
< ${samples[1]} zcat > sample_2.fq
"""
}
4.2 Store outputs matching a glob pattern
P:工作流中的任务会创建下游任务所需的许多输出文件。可根据文件名将其中一些文件存储到单独的目录中
S:使用两个或多个publishDir指令将输出文件存储到单独的存储路径中。对于每个指令,使用选项指定一个不同的 glob 字符串,pattern
仅将与提供的模式匹配的文件存储到每个目录中
代码:
Channel
.fromFilePairs(params.reads, flat: true)
.set{ samples_ch }
process foo {
publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"
publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'
publishDir "$params.outdir/$sampleId/", pattern: '*.fq'
input:
set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_ch
output:
file "*"
script:
"""
< sample1.fq.gz zcat > sample1.fq
< sample2.fq.gz zcat > sample2.fq
awk '{s++}END{print s/4}' sample1.fq > sample1_counts.txt
awk '{s++}END{print s/4}' sample2.fq > sample2_counts.txt
head -n 50 sample1.fq > sample1_outlook.txt
head -n 50 sample2.fq > sample2_outlook.txt
"""
}
4.3 Rename process outputs
P:需要将进程的输出存储到一个目录中,为文件指定一个您选择的名称
S:publishDir 允许在过程输出存储在所选择的目录。指定saveAs
参数为每个文件提供您选择的名称,证明自定义规则作为闭包(closure)
代码:
process foo {
publishDir 'results', saveAs: { filename -> "foo_$filename" }
output:
file '*.txt'
'''
touch this.txt
touch that.txt
'''
}
- 若保存在子目录中:
可以使用相同的模式将特定文件存储在不同的目录中,具体取决于实际名称
代码:
process foo {
publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }
output:
file '*'
'''
touch this.txt
touch that.zip
'''
}
5 Other
5.1 Get process work directory
P:需要当前任务工作目录的显式路径
S:使用$PWD
Bash 变量或pwd
命令检索任务工作目录路径
代码:
process foo {
echo true
script:
"""
echo foo task path: \$PWD
"""
}
process bar {
echo true
script:
'''
echo bar task path: $PWD
'''
}
注意:$
当命令脚本包含在双引号字符中时,请确保使用转义变量占位符,如上在双引号中的$
前加上了\
来进行了转义
- 使用以下命令为相同的脚本提供一些输入文件,以防止进程被执行
nextflow run patterns/process-get-workdir.nf --inputs ../data/prots/\*
5.2 Ignore failing process
P:预期任务在特定条件下会失败,由此希望忽略失败并继续执行工作流中的剩余任务
S:使用 process指令 errorStrategy 'ignore'
忽略错误条件
代码:
process foo {
errorStrategy 'ignore'
script:
'''
echo This is going to fail!
exit 1
'''
}
process bar {
script:
'''
echo OK
'''
}
5.3 Mock dependency
P:需要同步两个没有直接输入输出关系的进程bar
的执行,以便该进程仅在 process 完成后执行foo
S:将foo
产生标志值的通道添加到进程的输出中。然后将此通道用作进程的输入以bar
在其他进程完成时触发其执行
代码:
Channel
.fromPath('.data/reads/*.fq.gz')
.set{ reads_ch }
process foo {
output:
val true into done_ch
script:
"""
your_command_here
"""
}
process bar {
input:
val flag from done_ch
file fq from reads_ch
script:
"""
other_commad_here --reads $fq
"""
}
6 Advanced patterns
6.1 Conditional resources definition
P:工作流中的任务需要使用一定量的计算资源,例如。内存取决于一个或多个输入文件的大小或名称。
S:使用闭包以动态方式声明资源需求,例如memory
,cpus
等。闭包使用size
流程定义中声明的输入的文件属性(例如等),来计算所需的资源量。
代码:
Channel
.fromPath('reads/*_1.fq.gz')
.set { reads_ch }
process foo {
memory { reads.size() < 70.KB ? 1.GB : 5.GB }
input:
file reads from reads_ch
"""
your_command_here --in $reads
"""
}
6.2 Conditional process executions
P:两个不同的任务需要以互斥的方式执行,那么第三个任务应该对前一次执行的结果进行后处理。
S:使用when语句有条件地执行两个不同的进程。每个进程声明自己的输出通道。然后使用mix运算符创建一个新通道,该通道将发出两个进程产生的输出,并将其用作第三个进程的输入。
代码:
params.flag = false
process foo {
output:
file 'x.txt' into foo_ch
when:
!params.flag
script:
'''
echo foo > x.txt
'''
}
process bar {
output:
file 'x.txt' into bar_ch
when:
params.flag
script:
'''
echo bar > x.txt
'''
}
process omega {
echo true
input:
file x from foo_ch.mix(bar_ch)
script:
"""
cat $x
"""
}
可根据flag的不同来选择不同的进程执行,执行foo
和omega
:
nextflow run patterns/conditional-process.nf
执行bar
和omega
:
nextflow run patterns/conditional-process.nf --flag
结果:
peng@sin-try2:~/patterns$ nextflow run conditional-process.nf
N E X T F L O W ~ version 21.04.0-edge
Launching `conditional-process.nf` [soggy_hypatia] - revision: 1b07ba0b38
executor > local (2)
[82/ea867d] process > foo [100%] 1 of 1 ✔
[- ] process > bar -
[38/be26fb] process > omega (1) [100%] 1 of 1 ✔
foo
peng@sin-try2:~/patterns$ nextflow run conditional-process.nf --flag
N E X T F L O W ~ version 21.04.0-edge
Launching `conditional-process.nf` [astonishing_goldwasser] - revision: 1b07ba0b38
executor > local (2)
[- ] process > foo -
[7f/158b8c] process > bar [100%] 1 of 1 ✔
[05/b06073] process > omega (1) [100%] 1 of 1 ✔
bar
- 有条件地正常(使用数据)或作为空通道创建输入 通道。使用各个输入通道的过程仅在通道被填充时才会执行。每个进程仍然声明自己的输出通道。然后使用mix运算符创建一个新通道,该通道将发出两个进程产生的输出,并将其用作第三个进程的输入
代码:
params.flag = false
(foo_inch, bar_inch) = ( params.flag
? [ Channel.empty(), Channel.from(1,2,3) ]
: [ Channel.from(4,5,6), Channel.empty() ] )
process foo {
input:
val(f) from foo_inch
output:
file 'x.txt' into foo_ch
script:
"""
echo $f > x.txt
"""
}
process bar {
input:
val(b) from bar_inch
output:
file 'x.txt' into bar_ch
script:
"""
echo $b > x.txt
"""
}
process omega {
echo true
input:
file x from foo_ch.mix(bar_ch)
script:
"""
cat $x
"""
}
运行结果:
peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf
N E X T F L O W ~ version 21.04.0-edge
Launching `conditional-process2.nf` [naughty_minsky] - revision: 296937c5d2
executor > local (6)
[0b/8183d2] process > foo (3) [100%] 3 of 3 ✔
[- ] process > bar -
[e8/e9334f] process > omega (3) [100%] 3 of 3 ✔
6
5
4
peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf --flag
N E X T F L O W ~ version 21.04.0-edge
Launching `conditional-process2.nf` [disturbed_goldwasser] - revision: 296937c5d2
executor > local (6)
[- ] process > foo -
[aa/907692] process > bar (3) [100%] 3 of 3 ✔
[c1/e15b2f] process > omega (3) [100%] 3 of 3 ✔
1
2
3
6.3 Skip process execution
P:工作流程中有两个连续的任务,当指定了可选标志时,不应执行第一个任务,其输入由第二个任务处理。
S:使用条件表达式中创建的空通道,在指定可选参数时跳过第一个执行流程。然后将第二进程的输入定义为第一进程的输出(当被执行时)与输入信道的mix
代码:
params.skip = false
params.input = "$baseDir/sample.fq.gz"
Channel.fromPath(params.input).set{ input_ch }
(foo_ch, bar_ch) = ( params.skip
? [Channel.empty(), input_ch]
: [input_ch, Channel.empty()] )
process foo {
input:
file x from foo_ch
output:
file('*.fastq') into optional_ch
script:
"""
< $x zcat > ${x.simpleName}.fastq
"""
}
process bar {
echo true
input:
file x from bar_ch.mix(optional_ch)
"""
echo your_command --input $x
"""
}
6.4 Feedback loop☆
P:需要重复执行一个或多个任务,使用输出作为新迭代的输入,直到达到特定条件(i=o=i…)
S:使用迭代循环中最后一个进程的输出作为第一个进程的输入。使用Channel.create方法显式创建输出通道。然后将过程输入定义为初始输入和过程输出的mix,该过程输出应用于定义终止条件的until运算符
代码;
params.input = 'hello.txt'
condition = { it.readLines().size()>3 }
feedback_ch = Channel.create()
input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )
process foo {
input:
file x from input_ch
output:
file 'foo.txt' into foo_ch
script:
"""
cat $x > foo.txt
"""
}
process bar {
input:
file x from foo_ch
output:
file 'bar.txt' into feedback_ch
file 'bar.txt' into result_ch
script:
"""
cat $x > bar.txt
echo World >> bar.txt
"""
}
result_ch.last().println { "Result:\n${it.text.indent(' ')}" }
6.5 Optional input
P:一个或多个进程有一个可选的输入文件
S:使用特殊的文件名来标记文件参数的缺失
代码:
params.inputs = 'prots/*{1,2,3}.fa'
params.filter = 'NO_FILE'
prots_ch = Channel.fromPath(params.inputs)
opt_file = file(params.filter)
process foo {
input:
file seq from prots_ch
file opt from opt_file
script:
def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''
"""
your_commad --input $seq $filter
"""
}
6.6 Optional output
P:在某些情况下,工作流中的任务预计不会创建输出文件
S:将此类输出声明为optional
文件
代码:
process foo {
output:
file 'foo.txt' optional true into foo_ch
script:
'''
your_command
'''
}
6.7 Execute when empty
P:如果通道为空,您需要执行一个过程
S:使用ifEmpty运算符发出标记值以触发流程的执行
代码:
process foo {
input:
val x from ch.ifEmpty { 'EMPTY' }
when:
x == 'EMPTY'
script:
'''
your_command
'''
}