【译】使用Spark SQL 运行大规模基因组工作流

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: https://databricks.com/blog/2019/06/26/scaling-genomic-workflows-with-spark-sql-bgen-and-vcf-readers.html 使用Spark SQL 运行大规模基因组工作流 在过去十年中,随着基因组测序价格下降,可用基因组数据的数量逐渐激增。

https://databricks.com/blog/2019/06/26/scaling-genomic-workflows-with-spark-sql-bgen-and-vcf-readers.html

编译:诚历,阿里巴巴计算平台事业部 EMR 技术专家,Apache Sentry PMC,Apache Commons Committer,目前从事开源大数据存储和优化方面的工作。

使用Spark SQL 运行大规模基因组工作流

genomics_reader_writer_image3

在过去十年中,随着基因组测序价格下降,可用基因组数据的数量逐渐激增。研究人员现在已经能够从英国生物银行等项目的数十万人群中探测遗传变异和疾病之间的关联。这些分析将使人们更深入地了解疾病的根本原因,从而治疗当今一些主要的疾病问题。但是,目前用来分析这些数据集的工具还没有跟上数据增长的步伐。
 
许多用户习惯于使用命令行工具(如plink或单节点Python和R脚本)来处理基因组数据。但是,单节点工具暂时还不足以达到TB级甚至更高级别的程度。 目前Broad研究所的Hail项目建立在Spark之上,可以将计算分配到多个节点,但它要求用户除了Spark之外还要学习新的API,并鼓励数据以特定于Hail的文件格式存储。由于基因组数据不是孤立地保持其价值,而是针对结合不同数据来源(例如医疗记录,保险索赔和医学图像)进行分析后的一种输入,因此单独的系统可能导致严重的并发症。
 
我们相信Spark SQL已经成为处理各种不同风格的大量数据集的标准,代表了通向简单、可扩展的基因组工作流程的最直接途径。 Spark SQL用于以分布式方式来对(ETL)大数据进行提取,转换和加载。 ETL是生物信息学所涉及工作的90%,从提取突变,用外部数据源注释,到为下游统计和机器学习分析做准备。 Spark SQL包含Python或R等语言的高级API,这些API易于学习,并且比传统的生物信息学方法更容易阅读和维护代码。
在这篇文章中,我们将介绍在基因组数据和Spark SQL之间提供强大及灵活连接的数据Reader和Writer。
 

数据读取

Reader是Spark SQL中的数据源,因此VCF和BGEN可以像任何其他文件类型一样被读入Spark
DataFrame。 在Python中,VCF文件的目录读取如下所示:

spark.read\

  .format("com.databricks.vcf")\

  .option("includeSampleIds", True)\

  .option("flattenInfoFields", True)\

  .load("/databricks-datasets/genomics/1kg-vcfs")

VCF头中定义的数据类型将转换为输出DataFrame的架构。 此示例中的VCF文件包含许多成为可查询字段的注释:

genomics_reader_writer_image1

Spark SQL DataFrame 中VCF文件的内容
 
适用于群组中每个样本的字段(如被调用的基因型)存储在一个数组中,这样可以为每个站点的所有样本实现快速聚合。

genomics_reader_writer_image2

 
每个样本基因型字段的数组
 
由于那些使用VCF文件的人都非常清楚,VCF规范为数据格式化带来了歧义,这可能导致工具以意想不到的方式失败。 我们的目标是创建一个强大的解决方案,默认情况下接受格式错误的记录,然后允许我们的用户选择过滤条件。 例如,我们的一位客户使用我们的读者来摄取有问题的文件,其中一些概率值被存储为“nan”而不是“NaN”,这是大多数基于Java的工具所需要的。 自动处理这些简单问题使我们的用户可以专注于了解他们的数据意味着什么,而不是数据是否为正确格式化的。 为了验证我们读者的稳健性,我们针对由GATK和Edico Genomics等常用工具生成的VCF文件以及数据共享计划中的文件对其进行了测试。
 


诸如英国生物银行倡议分发的BGEN文件也可以类似地处理。 读取BGEN文件的代码与我们的VCF示例几乎完全相同:

spark.read.format("com.databricks.bgen").load(bgen_path)

 
这些文件读取器生成兼容的模式,允许用户编写适用于不同变异数据源的管道,并支持合并不同的基因组数据集。 例如,VCF阅读器可以获取具有不同INFO字段的文件目录,并返回包含公共字段的单个DataFrame。 以下命令读入BGEN和VCF文件中的数据并合并它们以创建单个数据集:

 vcf_df = spark.read.format(“com.databricks.vcf”).load(vcf_path)

bgen_df = spark.read.format(“com.databricks.bgen”)\

   .schema(vcf_df.schema).load(bgen_path)

big_df = vcf_df.union(bgen_df) # All my genotypes!!

由于我们的文件阅读器返回vanilla Spark SQL DataFrames,您可以使用Spark支持的任何编程语言(如Python,R,Scala,Java或纯SQL)来提取变体数据。 专门的前端API,例如Koalas,它在Apache Spark上实现了pandas数据帧API,并且还可以无缝地工作。

基因组数据的使用

由于每个变体级注释(VCF中的INFO字段)对应于DataFrame列,因此查询可以轻松访问这些值。 例如,我们可以计算小等位基因频率小于0.05的双等位基因变体的数量:

genomics_reader_writer_image6

Spark 2.4引入了高阶函数,简化了对数组数据的查询。 我们可以利用此功能来操纵基因型阵列。 要过滤基因型数组,使其仅包含至少有一个变异等位基因的样本,我们可以编写如下查询:

genomics_reader_writer_image5

使用更高阶函数操作基因型数组

 
如果您有VCF文件的tabix索引,我们的数据源会将基因组轨迹上的过滤器推送到索引并最大限度地降低I / O成本。 即使数据集增长超出单个机器可以支持的大小,简单查询仍然以交互速度完成。
 
正如我们在讨论摄取变体数据时所提到的,Spark支持的任何语言都可用于编写查询。 以上语句可以组合成单个SQL查询:

genomics_reader_writer_image4


使用SQL查询VCF文件
 

数据导出

我们相信,在不久的将来,组织将使用Delta Lake等技术存储和管理基因组数据,就像处理其他数据类型一样。 但是,我们知道向后兼容熟悉的文件格式以便与协作者共享或使用传统工具非常重要。
 
我们可以在我们的过滤示例的基础上创建一个块gzip压缩文件,其中包含等位基因频率小于5%的所有变体:

df.where(fx.expr("INFO_AF[0] < 0.05"))\

    .orderBy(“contigName”, “start”)\

    .write.format(“com.databricks.bigvcf”)\

    .save(“output.vcf.bgz”)

 
此命令并行对输出VCF的每个段进行排序,序列化和上载,因此您可以安全地输出群组规模的VCF。 每个染色体或甚至更小的粒度也可以输出一个VCF。
 
将相同数据保存到BGEN文件只需要对代码进行一次小修改:

df.where(fx.expr("INFO_AF[0] < 0.05"))\

    .orderBy(“contigName”, “start”)\

    .write.format(“com.databricks.bigbgen”)\

    .save(“output.bgen”)

下一步是什么

将数据提取到Spark中是大多数大数据作业的第一步,但这并不是大数据旅途的终点。
在接下来的几周内,我们将有更多博客文章,展示如何在这些读者和作者之上构建的功能可以扩展和简化基因组工作负载。 敬请关注!

_

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
机器学习/深度学习 SQL 数据采集
使用SQL和机器学习进行大规模自动化数据质量测试
使用SQL和机器学习进行大规模自动化数据质量测试
122 0
|
SQL 存储 NoSQL
基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-SQL 查询和分析
前言前面我们介绍了基于 MySQL + Tablestore 分层架构的订单系统。订单数据储存进入 Tablestore 后,用户可以使用 SDK 中的 API 访问数据,也可以继续使用 SQL 访问 Tablestore 中的数据。Tablestore 提供了多种 SQL 的接入方式,客户可以通过 DLA 访问 Tablestore,也可以利用 Tablestore 自身对 SQL 的支持能力,
1006 0
基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-SQL 查询和分析
|
SQL 存储 监控
Scheduled SQL: SLS 大规模日志上的全局分析与调度
本文总结了大规模日志全局分析的需求,讨论SLS上现有的典型分析方案,并延伸到 SLS 原生数据处理方案,介绍 Schedueld SQL 功能与最佳实践。
3837 0
Scheduled SQL: SLS 大规模日志上的全局分析与调度
|
安全 数据安全/隐私保护 云安全
高危预警| SQL数据库成主要攻击对象,或引发新一轮大规模勒索
云安全中心已提供免费7天试用服务,登陆控制台开启企业版试用功能,利用漏洞修复、基线检查、安全告警功能对系统做深度检查,及时修复当前存在的安全隐患。
14194 0
|
分布式计算 Spark SQL
【译】使用Spark SQL 运行大规模基因组工作流
原文链接:https://databricks.com/blog/2019/06/26/scaling-genomic-workflows-with-spark-sql-bgen-and-vcf-readers.html
1777 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
140 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
6月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
76 6