编译:
诚历,阿里巴巴计算平台事业部 EMR 技术专家,Apache Sentry PMC,Apache Commons Committer,目前从事开源大数据存储和优化方面的工作。
使用Spark SQL 运行大规模基因组工作流
在过去十年中,随着基因组测序价格下降,可用基因组数据的数量逐渐激增。研究人员现在已经能够从英国生物银行等项目的数十万人群中探测遗传变异和疾病之间的关联。这些分析将使人们更深入地了解疾病的根本原因,从而治疗当今一些主要的疾病问题。但是,目前用来分析这些数据集的工具还没有跟上数据增长的步伐。
许多用户习惯于使用命令行工具(如plink或单节点Python和R脚本)来处理基因组数据。但是,单节点工具暂时还不足以达到TB级甚至更高级别的程度。 目前Broad研究所的Hail项目建立在Spark之上,可以将计算分配到多个节点,但它要求用户除了Spark之外还要学习新的API,并鼓励数据以特定于Hail的文件格式存储。由于基因组数据不是孤立地保持其价值,而是针对结合不同数据来源(例如医疗记录,保险索赔和医学图像)进行分析后的一种输入,因此单独的系统可能导致严重的并发症。
我们相信Spark SQL已经成为处理各种不同风格的大量数据集的标准,代表了通向简单、可扩展的基因组工作流程的最直接途径。 Spark SQL用于以分布式方式来对(ETL)大数据进行提取,转换和加载。 ETL是生物信息学所涉及工作的90%,从提取突变,用外部数据源注释,到为下游统计和机器学习分析做准备。 Spark SQL包含Python或R等语言的高级API,这些API易于学习,并且比传统的生物信息学方法更容易阅读和维护代码。
在这篇文章中,我们将介绍在基因组数据和Spark SQL之间提供强大及灵活连接的数据Reader和Writer。
数据读取
Reader是Spark SQL中的数据源,因此VCF和BGEN可以像任何其他文件类型一样被读入Spark
DataFrame。 在Python中,VCF文件的目录读取如下所示:
.format("com.databricks.vcf")\
.option("includeSampleIds", True)\
.option("flattenInfoFields", True)\
.load("/databricks-datasets/genomics/1kg-vcfs")
VCF头中定义的数据类型将转换为输出DataFrame的架构。 此示例中的VCF文件包含许多成为可查询字段的注释:
Spark SQL DataFrame 中VCF文件的内容
适用于群组中每个样本的字段(如被调用的基因型)存储在一个数组中,这样可以为每个站点的所有样本实现快速聚合。
每个样本基因型字段的数组
由于那些使用VCF文件的人都非常清楚,VCF规范为数据格式化带来了歧义,这可能导致工具以意想不到的方式失败。 我们的目标是创建一个强大的解决方案,默认情况下接受格式错误的记录,然后允许我们的用户选择过滤条件。 例如,我们的一位客户使用我们的读者来摄取有问题的文件,其中一些概率值被存储为“nan”而不是“NaN”,这是大多数基于Java的工具所需要的。 自动处理这些简单问题使我们的用户可以专注于了解他们的数据意味着什么,而不是数据是否为正确格式化的。 为了验证我们读者的稳健性,我们针对由GATK和Edico Genomics等常用工具生成的VCF文件以及数据共享计划中的文件对其进行了测试。
诸如英国生物银行倡议分发的BGEN文件也可以类似地处理。 读取BGEN文件的代码与我们的VCF示例几乎完全相同:
spark.read.format("com.databricks.bgen").load(bgen_path)
这些文件读取器生成兼容的模式,允许用户编写适用于不同变异数据源的管道,并支持合并不同的基因组数据集。 例如,VCF阅读器可以获取具有不同INFO字段的文件目录,并返回包含公共字段的单个DataFrame。 以下命令读入BGEN和VCF文件中的数据并合并它们以创建单个数据集:
bgen_df = spark.read.format(“com.databricks.bgen”)\
.schema(vcf_df.schema).load(bgen_path)
big_df = vcf_df.union(bgen_df) # All my genotypes!!
由于我们的文件阅读器返回vanilla Spark SQL DataFrames,您可以使用Spark支持的任何编程语言(如Python,R,Scala,Java或纯SQL)来提取变体数据。 专门的前端API,例如Koalas,它在Apache Spark上实现了pandas数据帧API,并且还可以无缝地工作。
基因组数据的使用
由于每个变体级注释(VCF中的INFO字段)对应于DataFrame列,因此查询可以轻松访问这些值。 例如,我们可以计算小等位基因频率小于0.05的双等位基因变体的数量:
Spark 2.4引入了高阶函数,简化了对数组数据的查询。 我们可以利用此功能来操纵基因型阵列。 要过滤基因型数组,使其仅包含至少有一个变异等位基因的样本,我们可以编写如下查询:
使用更高阶函数操作基因型数组
如果您有VCF文件的tabix索引,我们的数据源会将基因组轨迹上的过滤器推送到索引并最大限度地降低I / O成本。 即使数据集增长超出单个机器可以支持的大小,简单查询仍然以交互速度完成。
正如我们在讨论摄取变体数据时所提到的,Spark支持的任何语言都可用于编写查询。 以上语句可以组合成单个SQL查询:
使用SQL查询VCF文件
数据导出
我们相信,在不久的将来,组织将使用Delta Lake等技术存储和管理基因组数据,就像处理其他数据类型一样。 但是,我们知道向后兼容熟悉的文件格式以便与协作者共享或使用传统工具非常重要。
我们可以在我们的过滤示例的基础上创建一个块gzip压缩文件,其中包含等位基因频率小于5%的所有变体:
.orderBy(“contigName”, “start”)\
.write.format(“com.databricks.bigvcf”)\
.save(“output.vcf.bgz”)
此命令并行对输出VCF的每个段进行排序,序列化和上载,因此您可以安全地输出群组规模的VCF。 每个染色体或甚至更小的粒度也可以输出一个VCF。
将相同数据保存到BGEN文件只需要对代码进行一次小修改:
.orderBy(“contigName”, “start”)\
.write.format(“com.databricks.bigbgen”)\
.save(“output.bgen”)
本文作者:阿里云E-MapReduce团队