我们为什么需要标准的工作流描述语言
在构造生物信息工作流时,如果不使用标准工作流描述语言,我们一般是使用脚本语言来构造:
- 直接采用shell、R、Python 等编写脚本,封装基因计算软件的命令行
- 多个脚本组成一个工作流
- 执行时输入参数执行命令行
这种方式构造的工作流,在使用过程中可能面临下面的挑战:
- 不同的基因流程可能使用不同的脚本语言、对使用者的计算机技能要求较高
- 脚本类的工作流的执行,无专业的解析引擎,运行状态无法监控
- 脚本健壮性要求高,如果错误处理没有做好,可能造成时间和资源的浪费
如果您也遇到了上面的痛点,那么您急需要引入标准的工作流来规范流程。比如 WDL 就是个很好的选择。
WDL
什么是 WDL
WDL 是 Workflow Description Language的缩写,有时也写作 Workflow Definition Language,是美国 Broad Institute 推出的工作流描述语言。
经过几年的发展,WDL 已经是生信行业广泛接受的一种工作流标准,具有下面的优势:
- Human-readable
WDL 作为一种为工作流领域定制的语言,和 Shell、Python 等通用的脚本语言相比,没有过多复杂的概念,对使用者的计算机技能要求不高,对于生信用户容易上手。 - Portable Workflow
WDL 可以在多个平台执行,比如本地服务器、SGE 集群,云计算平台等,可以做到一次编写多处执行。 - Standard
作为GA4GH支持的工作流描述语言之一,已经得到了众多大厂和行业协会的支持,形成了比较完善的生态。
从 Hello world 开始编写WDL
像我们学习其他语言一样,先从经典的 hello world 开始,学习 WDL 的编写
task echo {
String out
command {
echo Hello World! > ${out}
}
runtime {
cluster: "OnDemand ecs.sn1.medium img-ubuntu-vpc"
docker: "registry.cn-shanghai.aliyuncs.com/mynamespace/myubuntu:0.1"
}
output {
File outFile = "${out}"
}
}
workflow wf_echo {
call echo
output {
echo.outFile
}
}
上面的例子是一个简单的 WDL,作用是输出 Hello world
并保存在一个文件里面,输出文件名可以指定。一个完整的 WDL 一般由下面几个部分组成:
- workflow:工作流定义
- task:工作流包含的任务定义
- call:调用或触发工作流里面的 task 执行
- command:task在计算节点上要执行的命令行
- runtime:task在计算节点上的运行时参数,包括 CPU、内存、docker 镜像等
- output:task 或 workflow 的输出定义
使用变量
我们要把一个处理步骤构造成一个 task, 就要封装计算软件的命令行,那么命令行的参数如何传入呢?输出文件的名字如何指定呢?这些问题在 WDL 中可以通过变量来解决。比如 Hello world
例子中的 String out
就是一个字符串类型的输入,用于指定输出文件的名字。WDL 中的变量可以定义在 workflow 中,也可以定义在 task中。在command 和 output 中可以通过$和{}的方式来引用变量。
变量的类型主要有以下几种:
- String
- Int
- Float
- File
- Boolean
- Array[T]
- Map[K, V]
- Pair[X, Y]
- Object
关于每一种变量的使用,以及 WDL 的更多使用技巧,请参考官方规范文档。
task 如何组装成 workflow
一个 workflow 里面包含多个 task,task 之前的串行或并行关系如何表达呢?主要有下面三种情况:
Linear Chaining
第一种是最常见的场景,简单的线性串联,多个 task 依次执行,前面步骤的输出作为后面步骤的输入,最后一个 task 的输出作为整个 workflow 的输出。
Multi-input / Multi-output
第二种是多输入多输出的场景,一个 task 可以定义多个输入和输出,比如上面的例子,task B 有两个输出,作为 taskC 的输入。
Scatter-Gather Parallelism
第三种场景是用于 task 的并发执行。如果一个 task 有多个样本需要并发处理,可以使用数组的方式将样本传入,然后使用 scatter 并发的处理每个样本,每个执行的单元称为一个 shard。所有的 shard 执行完成,则当前 task 执行完成,所有 shard 的输出,又作为一个数组,可以传递到下一个 task 处理。
输入参数如何传入
workflow 的输入,比如基因样本的存储位置、计算软件的命令行参数、计算节点的资源配置等,可以通过 json 文件的形式来指定。使用 wdltools 工具可以根据 WDL 文件来生成输入模板:
模板格式如下:
当然,如果工作流不是很复杂,也可以按照上面的格式手写 input 文件。下面是一个 GATK 工作流的 input 文件的片段:
实际的例子
使用 GATK 构建的Jointcalling
workflow定义
task 定义:HaplotypeCallerERC
task定义 GenotypeGVCFs
工作流解析
- 整个 workflow 由2个 task 组成
- Task1 通过 Scatter 并发处理多份样本,得到一组 vcf 文件
- Task2 处理 Task1 输出的一组 vcf 文件,得到最终的workflow 输出
一个稍微复杂的例子--使用 GATK 做外显子分析
工作流定义
task定义:UnmappedBamToAlignedBam
工作流解析
- 整个 Workflow 由5个 task 组成
- Task 之间通过 Linear Chaining 的方式组合
- 每个 Task 是子 Workflow,由多个 Task 组合而成。也就是说 WDL 支持嵌套,workflow 里面的任务,既可以是一个 task,也可以是一个完整的 workflow,这个 workflow 被称为sub workflow。更多关于嵌套的用法请参考官方规范文档。
WDL 怎么运行
执行引擎 Cromwell
Cromwell 是 Broad Institute 开发的工作流管理引擎。具有如下的优势:
- 支持 WDL 和 CWL 两种工作流描述语言
- 多平台支持,包括本地服务器、SGE集群、云计算平台等
- 阿里云批量计算是官方支持的云平台之一
- 丰富的元数据,展示工作流执行过程
- 支持多种高级特性,优化 workflow 的执行
使用 Cromwell 运行 WDL
使用 Cromwell 运行 WDL 有两种模式
-
Run 模式
用来执行单个 WDL,适用于调试初期,快速执行一个WDL。$ java -jar cromwell.jar run echo.wdl --inputs input.json
-
Server 模式
用下面的命令启动一个 HTTP server$ java -Dconfig.file=application.conf -jar cromwell.jar server
再使用 RESTful API 提交工作流到 server 执行:
$ java -jar cromwell.jar submit -t wdl -i input.json -o option.json -h http://localhost:8000
相比 Run 模式,Server 模式有以下优势:
- 可以并行处理多个 workflow,适用于生产环境
- 有 Call caching 等高级特性(下文会讲到),优化 workflow 的执行
- 提供丰富的 workflow metadata,来展示 workflow 的执行过程
注意:不管是使用Run 模式还是 Server模式,要使用批量计算作为后端运行 WDL,都需要对应的配置文件支持,配置文件详解请参考批量计算官方文档或Cromwell 官方文档。
工具和支持
编辑工具及插件支持
上图的三个主流编辑器或 IDE 都有 WDL 的官方插件,支持语法高亮。
语法检查工具
WDL 编写完成后,在真正执行之前,我们可以使用官方工具进行语法检查:
$ java -jar wdltool.jar validate myWorkflow.wdl
Broad WDL 论坛
在使用 WDL 和 Cromwell 的过程中,如果遇到问题,可以到 Broad WDL 官方论坛寻求帮助,常见的问题,在论坛都可以找到答案。
WDL 参考样例
在学习编写 WDL 的过程中,可以参考 Broad 官方的一些 GATK 工作流,借鉴和学习 WDL 的用法。
WDL + Cromwell 在阿里云批量计算的最佳实践
批量计算服务云上基因计算
批量计算是阿里云上的一种适用于大规模并行批处理作业的分布式云服务。BatchCompute可支持海量作业并发规模,系统自动完成资源管理,作业调度和数据加载,并按实际使用量计费,是一种很适合基因计算的云计算平台,当前是 Cromwell 官方支持的云计算平台后端之一。
Cromwell + 批量计算实现 WDL 流程
上图是使用批量计算实现 WDL 流程的总架构图:
- 用户使用 WDL 构造 工作流
- 使用命令行工具提交 WDL 到 Cromwell server
- Cromwell server 解析 WDL 并将 task 转化为批量计算的作业提交
- 批量计算作业提交会后根据作业数量动态的申请计算资源
- 作业开始运行后,会从 OSS 对象存储读取测试数据,计算完成后会将结果上传到 OSS
- 针对基因数据分发中数据拷贝的痛点,推出了对应的分发解决方案:CCP(内容协作平台),生信分析公司通过CCP实现对象存储的目录-Drive的映射,实现租户级分目录管理,管理不同的测序公司访问权限,生物样本交付给第三方测序公司测序,测序公司直接上传数据到网盘,实现云上交付
Cromwell On 批量计算
批量计算作为官方支持的云计算后端,在存储、计算等方面做了大量的适配和优化。
支持的高级特性
从官方版本45开始,Cromwell 使用批量计算作为后端,支持 glob 和 Call caching 两个高级特性。
glob
glob 是指对 workflow 或 task 的输出,支持通配符匹配。
output {
Array[File] output_bams = glob("*.bam")
}
- 使用场景
输出文件有多个,且文件名不确定 - 使用方法
采用 glob 表达式,用 array 方式存储多个输出文件 - 价值
输出结果支持通配符匹配,简化 WDL 编写,采用数组方式,方便并发处理
Call caching
Call caching 是 Cromwell 的一个很有用的高级特性,通过 task 的复用,帮助客户节省时间,节省成本。
- 适用场景
输入和运行环境不变的情况下,复用之前 task 的运行结果 - 命中条件
输入 + 运行时参数相同 - 价值
复用之前的执行结果,节省时间,节省成本
例如上图的 workflow,一共有4个 task,当执行到第三个 task 时,由于参数错误失败了。当我们排除了错误之后,再次提交 workflow,Crowell 会检测到前两个 task 已经执行过,则直接复用其结果,从 task3 开始继续执行,从而达到节省时间,节省成本的目的。
关于 Call caching 的配置和使用,请参考批量计算官方文档。
批量计算 runtime
使用批量计算作为后端时,主要的 runtime 参数有:
-
cluster
- 计算集群环境
- 支持serverless 模式和固定集群模式
-
mounts
- 挂载设置
- 支持 OSS 和 NAS
-
docker
- 容器镜像地址
- 支持容器镜像服务
-
systemDisk
- 系统盘设置
- 包括磁盘类型和磁盘大小
-
dataDisk
- 数据盘设置
- 包括磁盘类型、磁盘大小和挂载点
-
timeout
- 作业超时时间
具体的参数解释及填写方法,请参考 Cromwell 官方文档。
启动 Cromwell server
为了方便用户部署 Cromwell server,我们推出了免费的 Cromwell server ECS 镜像,内置 Cromwell 运行需要的 java 环境、docker 工具、mysql 镜像及配置文件模板等,使用一键启动工具,就可以启停 Cromwell server:
$ cd /home/cromwell/docker-compose-mysql/
#初次部署,使用init来初始化配置并启动服务
$ ./server.sh init --id=xxxx --key=xxxx --root=oss://my-bucket/cromwell_dir --instance=ecs.sn1.medium --image=img-ubuntu-vpc
#停止服务
$ ./server.sh stop
#再次启动服务
$ ./server.sh start
服务启动后,可以使用镜像内置的命令行工具 widdler 来提交和查看 workflow。widdler命令行集成了Crowmell server 和批量计算服务,可以提供如下功能:
- 支持工作流的提交、查询等操作
- 支持工作流对应的批量计算日志的查询
- 支持失败工作流快速定位原因
- 支持workflow 级别的 metric 统计
- 支持workflow 级别的 billing 统计