Nextflow patterns

Nextflow patterns

1 Basic Patterns

1.1 Channel duplication

P:需要在两个或多个进程中使用相同的通道作为输入

S:使用into运算符创建源通道的两个(或更多)副本。然后,使用新通道作为流程的输入。

代码:

Channel
    .fromPath('prots/*_?.fa')
    .into { prot1_ch; prot2_ch }

process foo {
  input: file x from prot1_ch
  script:
  """
    echo your_command --input $x
  """
}

process bar {
  input: file x from prot2_ch
  script:
  """
    your_command --input $x
  """
}

2 Scatter executions

2.1 Process per file path

P:需要为每个匹配 glob 模式的文件执行一个任务

S:使用Channel.fromPath方法创建一个通道,发出与 glob 模式匹配的所有文件。然后,使用通道作为执行任务的流程的输入。

代码:

Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }

process foo {
  input:
  file x from samples_ch

  script:
  """
  your_command --input $x
  """
}

2.2 Process per file chunk

P:需要将一个或多个输入文件拆分为块并为每个文件执行一项任务

S:使用splitText运算符将文件拆分为给定大小的块。然后将结果通道用作执行任务的流程的输入

代码:

Channel
    .fromPath('poem.txt')
    .splitText(by: 5)
    .set{ chunks_ch }

process foo {
  echo true
  input:
  file x from chunks_ch

  script:
  """
  rev $x | rev
  """
}

2.3 Process per file pairs

P:需要将文件处理到按对分组的目录中

S:使用Channel.fromFilePairs方法创建一个通道,该通道发出与 glob 模式匹配的文件对。该模式必须匹配成对文件名中的公共前缀。匹配文件作为元组发出,其中第一个元素是匹配文件的分组键,第二个元素是文件对本身。

代码:

Channel
    .fromFilePairs('reads/*_{1,2}.fq.gz')
    .set { samples_ch }

process foo {
  input:
  set sampleId, file(reads) from samples_ch

  script:
  """
  your_command --sample $sampleId --reads $reads
  """
}
  • 自定义分组策略

需要时,可以定义自定义分组策略。一个常见的用例是对齐 BAM 文件 ( sample1.bam) 随附的索引文件。困难在于索引有时会被调用sample1.bai,有时sample1.bam.bai取决于所使用的软件。下面的例子可以适应这两种情况。

代码:

Channel
    .fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }
    .set { samples_ch }

process foo {
  input:
  set sampleId, file(bam) from samples_ch

  script:
  """
  your_command --sample $sampleId --bam ${sampleId}.bam
  """
}

2.4 Process per file range

P:需要在具有共同索引范围的两个或更多系列文件上执行任务

S:使用from方法定义重复执行任务的范围,然后将其与map运算符链接以将每个索引与相应的输入文件相关联。最后使用结果通道作为过程的输入

代码:

Channel
  .from(1..23)
  .map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }
  .set { pairs_ch }


process foo {
  tag "$sampleId"

  input:
  set sampleId, file(indels), file(snps) from pairs_ch

  """
  echo foo_command --this $indels --that $snps
  """
}

2.5 Process per CSV record

P:需要为一个或多个 CSV 文件中的每条记录执行一项任务

S:使用splitCsv运算符逐行读取 CSV 文件,然后使用map运算符返回每行所需字段的元组,并使用该file函数将任何字符串路径转换为文件路径对象。最后使用结果通道作为过程的输入

index.csv

sampleId read 1 read2
FC816RLABXX read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gz read/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz
FC812MWABXX read/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gz read110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz
FC81DE8ABXX read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gz read/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz
FC81DB5ABXX read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gz read/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz
FC819P0ABXX read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gz read/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz

代码:

params.index = 'index.csv'

Channel
    .fromPath(params.index)
    .splitCsv(header:true)
    .map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
    .set { samples_ch }

process foo {
    input:
    set sampleId, file(read1), file(read2) from samples_ch

    script:
    """
    echo your_command --sample $sampleId --reads $read1 $read2
    """
}

2.6 Process per file output

P:工作流中的任务一次生成两个或更多文件。下游任务需要独立处理这些文件中的每一个

S:使用flatten运算符将上游进程的输出转换为单独发送到每个文件的通道。然后将此通道用作下游过程的输入

代码:

process foo {
  output:
  file '*.txt' into foo_ch
  script:
  '''
  echo Hello there! > file1.txt
  echo What a beautiful day > file2.txt
  echo I wish you are having fun1 > file3.txt
  '''
}

process bar {
  input:
  file x from foo_ch.flatten()
  script:
  """
  cat $x
  """
}

3 Gather results

3.1 Process all outputs altogether

P:需要完全处理上游任务的所有输出

S:使用collect运算符收集上游任务产生的所有输出,并将它们作为唯一输出发出。然后使用结果通道作为过程的输入输入

代码:

Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }

process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """
  < $x zcat > file.fq
  """
}

process bar {
  echo true
  input:
  file '*.fq' from unzipped_ch.collect()
  """
  cat *.fq
  """
}

3.2 Process outputs into groups

P:需要在同一批次中处理文件名中具有匹配键的所有文件

S:使用map运算符将每个文件关联一个从文件名中提取的键。然后将结果通道与groupTuple运算符链接起来,将所有具有匹配键的文件组合在一起。最后使用结果通道作为过程的输入。

代码:

Channel
    .fromPath('reads/*')
    .map { file ->
        def key = file.name.toString().tokenize('_').get(0)
        return tuple(key, file)
     }
    .groupTuple()
    .set{ groups_ch }


process foo {
  input:
  set key, file(samples) from groups_ch

  script:
  """
  echo your_command --batch $key --input $samples
  """
}

3.3 Collect outputs into a file

P:需要将上游进程生成的所有输出文件连接到一个文件中

S:使用collectFile运算符将所有输出文件合并为一个文件

代码:

Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }

process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """
  < $x zcat > file.fq
  """
}

unzipped_ch
      .collectFile()
      .println{ it.text }

4 Organize outputs

4.1 Store process outputs

P:需要将一个或多个进程的输出存储到您选择的目录结构中

S:使用publishDir指令设置一个自定义目录,在该目录中需要提供流程输出

代码:

params.reads = 'reads/*{1,2}.fq.gz'
params.outdir = 'my-results'

Channel.fromFilePairs(params.reads).set{ samples_ch }

process foo {
  publishDir "$params.outdir/$sampleId"
  input:
  set sampleId, file(samples) from samples_ch
  output:
  file '*.fq'

  script:
  """
  < ${samples[0]} zcat > sample_1.fq
  < ${samples[1]} zcat > sample_2.fq
  """
}

4.2 Store outputs matching a glob pattern

P:工作流中的任务会创建下游任务所需的许多输出文件。可根据文件名将其中一些文件存储到单独的目录中

S:使用两个或多个publishDir指令将输出文件存储到单独的存储路径中。对于每个指令,使用选项指定一个不同的 glob 字符串,pattern仅将与提供的模式匹配的文件存储到每个目录中

代码:

Channel
    .fromFilePairs(params.reads, flat: true)
    .set{ samples_ch }

process foo {
  publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"
  publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'
  publishDir "$params.outdir/$sampleId/", pattern: '*.fq'

  input:
    set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_ch
  output:
    file "*"
  script:
  """
    < sample1.fq.gz zcat > sample1.fq
    < sample2.fq.gz zcat > sample2.fq

    awk '{s++}END{print s/4}' sample1.fq > sample1_counts.txt
    awk '{s++}END{print s/4}' sample2.fq > sample2_counts.txt

    head -n 50 sample1.fq > sample1_outlook.txt
    head -n 50 sample2.fq > sample2_outlook.txt
  """
}

4.3 Rename process outputs

P:需要将进程的输出存储到一个目录中,为文件指定一个您选择的名称

S:publishDir 允许在过程输出存储在所选择的目录。指定saveAs参数为每个文件提供您选择的名称,证明自定义规则作为闭包(closure)

代码:

process foo {
 publishDir 'results', saveAs: { filename -> "foo_$filename" }

 output:
 file '*.txt'

 '''
 touch this.txt
 touch that.txt
 '''
}
  • 若保存在子目录中:

可以使用相同的模式将特定文件存储在不同的目录中,具体取决于实际名称

代码:

process foo {
 publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }

 output:
 file '*'

 '''
 touch this.txt
 touch that.zip
 '''
}

5 Other

5.1 Get process work directory

P:需要当前任务工作目录的显式路径

S:使用$PWDBash 变量或pwd命令检索任务工作目录路径

代码:

process foo {
  echo true
  script:
  """
  echo foo task path: \$PWD
  """
}

process bar {
  echo true
  script:
  '''
  echo bar task path: $PWD
  '''
}

注意:$当命令脚本包含在双引号字符中时,请确保使用转义变量占位符,如上在双引号中的$前加上了\来进行了转义

  • 使用以下命令为相同的脚本提供一些输入文件,以防止进程被执行
nextflow run patterns/process-get-workdir.nf --inputs ../data/prots/\*

5.2 Ignore failing process

P:预期任务在特定条件下会失败,由此希望忽略失败并继续执行工作流中的剩余任务

S:使用 process指令 errorStrategy 'ignore'忽略错误条件

代码:

process foo {
  errorStrategy 'ignore'
  script:
  '''
    echo This is going to fail!
    exit 1
  '''
}

process bar {
  script:
  '''
  echo OK
  '''
}

5.3 Mock dependency

P:需要同步两个没有直接输入输出关系的进程bar的执行,以便该进程仅在 process 完成后执行foo

S:将foo产生标志值的通道添加到进程的输出中。然后将此通道用作进程的输入以bar在其他进程完成时触发其执行

代码:

Channel
    .fromPath('.data/reads/*.fq.gz')
    .set{ reads_ch }

process foo {
    output:
    val true into done_ch

    script:
    """
    your_command_here
    """
}

process bar {
    input:
    val flag from done_ch
    file fq from reads_ch

    script:
    """
    other_commad_here --reads $fq
    """
}

6 Advanced patterns

6.1 Conditional resources definition

P:工作流中的任务需要使用一定量的计算资源,例如。内存取决于一个或多个输入文件的大小或名称。

S:使用闭包以动态方式声明资源需求,例如memorycpus等。闭包使用size流程定义中声明的输入的文件属性(例如等),来计算所需的资源量。

代码:

Channel
    .fromPath('reads/*_1.fq.gz')
    .set { reads_ch }

process foo {
    memory { reads.size() < 70.KB ? 1.GB : 5.GB }

    input:
    file reads from reads_ch

    """
    your_command_here --in $reads
    """
}

6.2 Conditional process executions

P:两个不同的任务需要以互斥的方式执行,那么第三个任务应该对前一次执行的结果进行后处理。

S:使用when语句有条件地执行两个不同的进程。每个进程声明自己的输出通道。然后使用mix运算符创建一个新通道,该通道将发出两个进程产生的输出,并将其用作第三个进程的输入。

代码:

params.flag = false

process foo {
  output:
  file 'x.txt' into foo_ch
  when:
  !params.flag

  script:
  '''
  echo foo > x.txt
  '''
}

process bar {
  output:
  file 'x.txt' into bar_ch
  when:
  params.flag

  script:
  '''
  echo bar > x.txt
  '''
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}

可根据flag的不同来选择不同的进程执行,执行fooomega

nextflow run patterns/conditional-process.nf

执行baromega

nextflow run patterns/conditional-process.nf --flag

结果:

peng@sin-try2:~/patterns$ nextflow run conditional-process.nf   
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process.nf` [soggy_hypatia] - revision: 1b07ba0b38
executor >  local (2)
[82/ea867d] process > foo       [100%] 1 of 1 ✔
[-        ] process > bar       -
[38/be26fb] process > omega (1) [100%] 1 of 1 ✔
foo

peng@sin-try2:~/patterns$ nextflow run conditional-process.nf --flag
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process.nf` [astonishing_goldwasser] - revision: 1b07ba0b38
executor >  local (2)
[-        ] process > foo       -
[7f/158b8c] process > bar       [100%] 1 of 1 ✔
[05/b06073] process > omega (1) [100%] 1 of 1 ✔
bar
  • 有条件地正常(使用数据)或作为通道创建输入 通道。使用各个输入通道的过程仅在通道被填充时才会执行。每个进程仍然声明自己的输出通道。然后使用mix运算符创建一个新通道,该通道将发出两个进程产生的输出,并将其用作第三个进程的输入

代码:

params.flag = false

(foo_inch, bar_inch) = ( params.flag
                     ? [ Channel.empty(), Channel.from(1,2,3) ]
                     : [ Channel.from(4,5,6), Channel.empty() ] )

process foo {

  input:
  val(f) from foo_inch

  output:
  file 'x.txt' into foo_ch

  script:
  """
  echo $f > x.txt
  """
}

process bar {
  input:
  val(b) from bar_inch

  output:
  file 'x.txt' into bar_ch

  script:
  """
  echo $b > x.txt
  """
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}

运行结果:

peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf 
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process2.nf` [naughty_minsky] - revision: 296937c5d2
executor >  local (6)
[0b/8183d2] process > foo (3)   [100%] 3 of 3 ✔
[-        ] process > bar       -
[e8/e9334f] process > omega (3) [100%] 3 of 3 ✔
6

5

4

peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf --flag
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process2.nf` [disturbed_goldwasser] - revision: 296937c5d2
executor >  local (6)
[-        ] process > foo       -
[aa/907692] process > bar (3)   [100%] 3 of 3 ✔
[c1/e15b2f] process > omega (3) [100%] 3 of 3 ✔
1

2

3

6.3 Skip process execution

P:工作流程中有两个连续的任务,当指定了可选标志时,不应执行第一个任务,其输入由第二个任务处理。

S:使用条件表达式中创建的空通道,在指定可选参数时跳过第一个执行流程。然后将第二进程的输入定义为第一进程的输出(当被执行时)与输入信道的mix

代码:

params.skip = false
params.input = "$baseDir/sample.fq.gz"

Channel.fromPath(params.input).set{ input_ch }

(foo_ch, bar_ch) = ( params.skip
                 ? [Channel.empty(), input_ch]
                 : [input_ch, Channel.empty()] )

process foo {
  input:
  file x from foo_ch

  output:
  file('*.fastq') into optional_ch

  script:
  """
  < $x zcat > ${x.simpleName}.fastq
  """
}

process bar {
  echo true
  input:
  file x from bar_ch.mix(optional_ch)
  """
  echo your_command --input $x
  """
}

6.4 Feedback loop☆

P:需要重复执行一个或多个任务,使用输出作为新迭代的输入,直到达到特定条件(i=o=i…)

S:使用迭代循环中最后一个进程的输出作为第一个进程的输入。使用Channel.create方法显式创建输出通道。然后将过程输入定义为初始输入和过程输出的mix,该过程输出应用于定义终止条件的until运算符

代码;

params.input = 'hello.txt'

condition = { it.readLines().size()>3 }
feedback_ch = Channel.create()
input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )

process foo {
    input:
    file x from input_ch
    output:
    file 'foo.txt' into foo_ch
    script:
    """
    cat $x > foo.txt
    """
}

process bar {
    input:
    file x from foo_ch
    output:
    file 'bar.txt' into feedback_ch
    file 'bar.txt' into result_ch
    script:
    """
    cat $x > bar.txt
    echo World >> bar.txt
    """
}

result_ch.last().println { "Result:\n${it.text.indent(' ')}" }

6.5 Optional input

P:一个或多个进程有一个可选的输入文件

S:使用特殊的文件名来标记文件参数的缺失

代码:

params.inputs = 'prots/*{1,2,3}.fa'
params.filter = 'NO_FILE'

prots_ch = Channel.fromPath(params.inputs)
opt_file = file(params.filter)

process foo {
  input:
  file seq from prots_ch
  file opt from opt_file

  script:
  def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''
  """
  your_commad --input $seq $filter
  """
}

6.6 Optional output

P:在某些情况下,工作流中的任务预计不会创建输出文件

S:将此类输出声明为optional文件

代码:

process foo {
  output:
  file 'foo.txt' optional true into foo_ch

  script:
  '''
  your_command
  '''
}

6.7 Execute when empty

P:如果通道为空,您需要执行一个过程

S:使用ifEmpty运算符发出标记值以触发流程的执行

代码:

process foo {
  input:
  val x from ch.ifEmpty { 'EMPTY' }
  when:
  x == 'EMPTY'

  script:
  '''
  your_command
  '''
}
上一篇:【Python基础】python协程


下一篇:ES6---箭头函数()=>{} 与function的区别