【译】使用Spark SQL 运行大规模基因组工作流

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 将数据提取到Spark中是大多数大数据作业的第一步,但这并不是大数据旅途的终点。

原文链接:

https://databricks.com/blog/2019/06/26/scaling-genomic-workflows-with-spark-sql-bgen-and-vcf-readers.html

编译:

诚历,阿里巴巴计算平台事业部 EMR 技术专家,Apache Sentry PMC,Apache Commons Committer,目前从事开源大数据存储和优化方面的工作。

使用Spark SQL 运行大规模基因组工作流


image

在过去十年中,随着基因组测序价格下降,可用基因组数据的数量逐渐激增。研究人员现在已经能够从英国生物银行等项目的数十万人群中探测遗传变异和疾病之间的关联。这些分析将使人们更深入地了解疾病的根本原因,从而治疗当今一些主要的疾病问题。但是,目前用来分析这些数据集的工具还没有跟上数据增长的步伐。

许多用户习惯于使用命令行工具(如plink或单节点Python和R脚本)来处理基因组数据。但是,单节点工具暂时还不足以达到TB级甚至更高级别的程度。 目前Broad研究所的Hail项目建立在Spark之上,可以将计算分配到多个节点,但它要求用户除了Spark之外还要学习新的API,并鼓励数据以特定于Hail的文件格式存储。由于基因组数据不是孤立地保持其价值,而是针对结合不同数据来源(例如医疗记录,保险索赔和医学图像)进行分析后的一种输入,因此单独的系统可能导致严重的并发症。

我们相信Spark SQL已经成为处理各种不同风格的大量数据集的标准,代表了通向简单、可扩展的基因组工作流程的最直接途径。 Spark SQL用于以分布式方式来对(ETL)大数据进行提取,转换和加载。 ETL是生物信息学所涉及工作的90%,从提取突变,用外部数据源注释,到为下游统计和机器学习分析做准备。 Spark SQL包含Python或R等语言的高级API,这些API易于学习,并且比传统的生物信息学方法更容易阅读和维护代码。
在这篇文章中,我们将介绍在基因组数据和Spark SQL之间提供强大及灵活连接的数据Reader和Writer。

数据读取

Reader是Spark SQL中的数据源,因此VCF和BGEN可以像任何其他文件类型一样被读入Spark
DataFrame。 在Python中,VCF文件的目录读取如下所示:


  .format("com.databricks.vcf")\

  .option("includeSampleIds", True)\

  .option("flattenInfoFields", True)\

  .load("/databricks-datasets/genomics/1kg-vcfs")

VCF头中定义的数据类型将转换为输出DataFrame的架构。 此示例中的VCF文件包含许多成为可查询字段的注释:

image

Spark SQL DataFrame 中VCF文件的内容

适用于群组中每个样本的字段(如被调用的基因型)存储在一个数组中,这样可以为每个站点的所有样本实现快速聚合。

image

每个样本基因型字段的数组

由于那些使用VCF文件的人都非常清楚,VCF规范为数据格式化带来了歧义,这可能导致工具以意想不到的方式失败。 我们的目标是创建一个强大的解决方案,默认情况下接受格式错误的记录,然后允许我们的用户选择过滤条件。 例如,我们的一位客户使用我们的读者来摄取有问题的文件,其中一些概率值被存储为“nan”而不是“NaN”,这是大多数基于Java的工具所需要的。 自动处理这些简单问题使我们的用户可以专注于了解他们的数据意味着什么,而不是数据是否为正确格式化的。 为了验证我们读者的稳健性,我们针对由GATK和Edico Genomics等常用工具生成的VCF文件以及数据共享计划中的文件对其进行了测试。

诸如英国生物银行倡议分发的BGEN文件也可以类似地处理。 读取BGEN文件的代码与我们的VCF示例几乎完全相同:

spark.read.format("com.databricks.bgen").load(bgen_path)

这些文件读取器生成兼容的模式,允许用户编写适用于不同变异数据源的管道,并支持合并不同的基因组数据集。 例如,VCF阅读器可以获取具有不同INFO字段的文件目录,并返回包含公共字段的单个DataFrame。 以下命令读入BGEN和VCF文件中的数据并合并它们以创建单个数据集:


bgen_df = spark.read.format(“com.databricks.bgen”)\

   .schema(vcf_df.schema).load(bgen_path)

big_df = vcf_df.union(bgen_df) # All my genotypes!!

由于我们的文件阅读器返回vanilla Spark SQL DataFrames,您可以使用Spark支持的任何编程语言(如Python,R,Scala,Java或纯SQL)来提取变体数据。 专门的前端API,例如Koalas,它在Apache Spark上实现了pandas数据帧API,并且还可以无缝地工作。

基因组数据的使用

由于每个变体级注释(VCF中的INFO字段)对应于DataFrame列,因此查询可以轻松访问这些值。 例如,我们可以计算小等位基因频率小于0.05的双等位基因变体的数量:

image
Spark 2.4引入了高阶函数,简化了对数组数据的查询。 我们可以利用此功能来操纵基因型阵列。 要过滤基因型数组,使其仅包含至少有一个变异等位基因的样本,我们可以编写如下查询:

image

使用更高阶函数操作基因型数组

如果您有VCF文件的tabix索引,我们的数据源会将基因组轨迹上的过滤器推送到索引并最大限度地降低I / O成本。 即使数据集增长超出单个机器可以支持的大小,简单查询仍然以交互速度完成。

正如我们在讨论摄取变体数据时所提到的,Spark支持的任何语言都可用于编写查询。 以上语句可以组合成单个SQL查询:

image

使用SQL查询VCF文件

数据导出

我们相信,在不久的将来,组织将使用Delta Lake等技术存储和管理基因组数据,就像处理其他数据类型一样。 但是,我们知道向后兼容熟悉的文件格式以便与协作者共享或使用传统工具非常重要。

我们可以在我们的过滤示例的基础上创建一个块gzip压缩文件,其中包含等位基因频率小于5%的所有变体:


    .orderBy(“contigName”, “start”)\

    .write.format(“com.databricks.bigvcf”)\

    .save(“output.vcf.bgz”)

此命令并行对输出VCF的每个段进行排序,序列化和上载,因此您可以安全地输出群组规模的VCF。 每个染色体或甚至更小的粒度也可以输出一个VCF。

将相同数据保存到BGEN文件只需要对代码进行一次小修改:


    .orderBy(“contigName”, “start”)\

    .write.format(“com.databricks.bigbgen”)\

    .save(“output.bgen”)

下一步是什么

将数据提取到Spark中是大多数大数据作业的第一步,但这并不是大数据旅途的终点。
在接下来的几周内,我们将有更多博客文章,展示如何在这些读者和作者之上构建的功能可以扩展和简化基因组工作负载。 敬请关注!

_

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
8天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
78 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
34 0
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
111 13
|
4月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
4月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
60 6
|
4月前
|
存储 SQL C++
对比 SQL Server中的VARCHAR(max) 与VARCHAR(n) 数据类型
【7月更文挑战7天】SQL Server 中的 VARCHAR(max) vs VARCHAR(n): - VARCHAR(n) 存储最多 n 个字符(1-8000),适合短文本。 - VARCHAR(max) 可存储约 21 亿个字符,适合大量文本。 - VARCHAR(n) 在处理小数据时性能更好,空间固定。 - VARCHAR(max) 对于大文本更合适,但可能影响性能。 - 选择取决于数据长度预期和业务需求。
371 1
|
3月前
|
SQL 安全 Java
驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“The server selected protocol version TLS10 is not accepted by client
驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“The server selected protocol version TLS10 is not accepted by client
470 0