为异构的大数据运行环境构建数据管道

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

为异构的大数据运行环境构建数据管道

Pipeline61框架可以用于为异构的运行环境构建数据管道。它可以重用已经部署在各个环境里的作业代码,并提供了版本控制和依赖管理来解决典型的软件工程问题。

研究人员开发了大数据处理框架,如MapReduce和Spark,用于处理分布在大规模集群里的大数据集。这些框架着实降低了开发大数据应用程序的复杂度。在实际当中,有很多的真实场景要求将多个数据处理和数据分析作业进行管道化和集成。例如,图像分析应用要求一些预处理步骤,如图像解析和特征抽取,而机器学习算法是整个分析流里唯一的核心组件。不过,要对已经开发好的作业进行管道化和集成,以便支持更为复杂的数据分析场景,并不是一件容易的事。为了将运行在异构运行环境里的数据作业集成起来,开发人员必须写很多胶水代码,让数据在这些作业间流入流出。Google的一项研究表明,一个成熟的系统可能只包含了5%的机器学习代码,而剩下的95%都是胶水代码。

为了支持对大数据作业进行管道化和集成,研究人员推荐使用高级的管道框架,如Crunch、Pig和Cascading等。这些框架大都是基于单一的数据处理运行环境而构建的,并要求使用特定的接口和编程范式来构建管道。况且,管道应用需要不断演化,满足新的变更和需求。这些应用还有可能包含各种遗留的组件,它们需要不同的运行环境。因此,维护和管理这些管道变得非常复杂和耗时。

Pipeline61框架旨在为在异构的运行环境里维护和管理数据管道减少精力的投入,而不需要重写原有的作业。它可以将运行在各种环境里的数据处理组件集成起来,包括MapReduce、Spark和脚本。它尽可能重用现有的数据处理组件,开发人员就没有必要重新学习新的编程范式。除此之外,它为每个管道的数据和组件提供了自动化的版本控制和依赖管理。

现有的管道框架

大多数用于构建管道化大数据作业的框架都是基于单一的处理引擎而构建的(比如Hadoop),并使用了外部的持久化服务(比如Hadoop分布式文件系统)来交换数据。表A比较了几种最为重要的管道框架。

Crunch定义了自己的数据模型和编程范式,用于支持管道的写入,并在MapReduce和Spark上运行管道作业。Pig使用了一种基于数据流的编程范式来编写ETL(抽取、转换、加载)脚本,并在执行期被转换成MapReduce作业。Cascading为管道提供了基于操作符的编程接口,并支持在MapReduce上运行Cascading应用。Flume最初是为基于日志的管道而设计的,用户通过配置文件和参数来创建管道。MRQL(MapReduce查询语言)是一种通用的系统,用于在各种运行环境上进行查询和优化,如Hadoop、Spark和Flink。Tez是一个基于有向无环图的优化框架,它可以用于优化使用Pig和Hive编写的MapReduce管道。

Pipeline61与这些框架的不同点在于:

  • 支持对异构的数据处理作业(MapReduce、Spark和脚本)进行管道化和集成。
  • 重用现有的编程范式,而不是要求开发人员学习新的编程范式。
  • 提供自动化的版本控制和依赖管理,具备历史可追踪性和可重现性,这些对于管道的持续开发来说是非常重要的。

与Pipeline61类似,Apache Object Oriented Data Technology(OODT)数据栅格框架支持让用户从异构环境中捕捉、定位和访问数据。与Pipeline61相比,OODT提供了更具通用性的任务驱动工作流执行过程,开发人员必须编写程序来调用不同的任务。相反,Pipeline61专注于与当前的大数据处理框架进行深度集成,包括Spark、MapReduce和IPython。OODT使用了基于XML的管道配置,而Pipeline61为各种编程语言提供了编程接口。最后,OODT需要维护数据集的一般性信息和元数据。Pipeline61为管道里的IO数据和转换任务提供了显式的来源信息。因此,Pipeline61原生地支持历史数据管道或部分数据管道的重新生成和重新执行。

一个有趣的例子

我们的例子是一个嫌疑检测系统,图1展示了该系统的数据处理管道。系统收集来自各个部门和组织的数据,比如来自政府道路服务部门的机动车注册记录、来自政府税务部门的个人收入报告,或来自航空公司的航程记录。来自不同数据源的记录可能具有不同的格式,如CSV、文本、JSON,它们的结构是不一样的。

图1.嫌疑检测系统的数据处理管道。来自不同部门和组织的数据可能具有不同的格式和结构。CSV表示以逗号分隔的数据值,JSON表示JavaScript Object Notation,MR表示MapReduce,HDFS是Hadoop分布式文件系统。

在数据管道的各个阶段,不同的数据科学家或工程师们可能使用不同的技术和框架来开发数据处理组件,比如IPython、MapReduce、R和Spark。一些遗留的组件也可以通过Bash脚本或第三方软件进行集成。所以,管理和维护异构环境里持续变化的数据管道是一个复杂而沉闷的任务。使用新框架替代旧框架的代价是很高的,或许更加难以承受。在最坏的情况下,开发人员可能需要重新实现所有的数据处理组件。

另外,正如我们之前提过的那样,为了满足新的系统变更需求,管道应用程序需要保持演化和更新。例如,可能会有新的数据源加入进来,或者现有的数据源的格式和结构会发生变更,或者升级分析组件来提升性能和准确性。这些都会导致管道组件的持续变化和更新。在管道演化过程中提供可追踪性和可再现性会成为一个挑战。管道开发人员可能想检查管道的历史,用于比较更新前后有什么不同。另外,如果有必要,每个数据处理组件应该能够回滚到上一个版本。

Pipeline61

为了解决这些挑战性问题,Pipeline61使用了三个主要的组件:执行引擎触发器、监控器,以及管道管理器。数据服务提供了统一的数据IO层,用于完成枯燥的数据交换以及各种不同数据源之间的转换工作。依赖和版本管理器为管道里的数据和组件提供了自动化的版本控制和依赖管理。Pipeline61为开发人员提供了一套管理API,他们可以通过发送和接收消息进行管道的测试、部署和监控。

图2. Pipeline61架构。Pipeline61框架旨在为在异构的运行环境里维护和管理数据管道减少精力的投入,而不需要重写原有的作业。DAG表示有向无环图。

Pipe模型

Pipeline61将管道组件表示为pipe,每个pipe有一些相关联的实体:

  • pipe的名字必须是唯一的,而且要与pipe的管理信息具有相关性。名字里可以包含命名空间信息。
  • pipe的版本信息会自动增长。用户可以执行指定版本的pipe。
  • 管道服务器负责管理和维护pipe。pipe需要知道管道服务器的地址信息,在运行期间,它可以向管道服务器发送通知消息。
  • 输入和输出URL里包含了pipe的IO数据所使用的协议和地址。协议表示持久化系统的类型,如HDFS(Hadoop分布式文件系统)、JDBC(Java Database Connectivity)、S3(Amazon Simple Storage Service)、文件存储和其他类型的数据存储系统。
  • IO数据的输入格式和输出格式指明了数据的读取格式和写入格式。
  • 运行上下文指明了运行环境和运行框架所需要的其他信息。
  • 运行上下文与数据处理框架紧密相关。Pipeline61目前有三种主要的运行上下文:
  • Spark运行上下文包含了一个SparkProc属性,该属性为SparkSQL提供了一个转换函数,用于将输入RDD(弹性分布式数据集)转化成输出RDD,或者将输入DataFrame转换成输出DataFrame。
  • MapReduce运行上下文包含了一些结构化的参数,指明了MapReduce作业的Mapper、Reducer、Combiner和Partitioner。可以使用key-value的形式添加其他参数。
  • shell运行上下文包含了一个脚本文件或者内联的命令。Python和R脚本是shell pipe组件的子类型,它们可以使用更多由数据服务控制的输入和输出。shell pipe的不足之处在于,开发人员必须手动地处理输入和输出的数据转换。

图3展示了如何写一个简单的SparkPipe。基本上,开发人员只要使用SparkProc接口来包装Spark RDD函数,然后使用SparkProc初始化一个SparkPipe对象。

图3. 如何写一个简单的SparkPipe。开发人员使用SparkProc接口包装Spark RDD函数,然后使用SparkProc初始化一个SparkPipe对象。

Pipeline61让开发人员可以在逻辑层面将不同类型的pipe无缝地集成到一起。它提供了方法,用于将pipe连接起来形成管道。在将pipe连接起来之后,前一个pipe的输出就变成了下一个pipe的输入。在后面的案例学习部分,我们会展示一个更具体的例子。

执行引擎

执行引擎包含了三个组件。

管道服务器包含了消息处理器,用于接收和处理来自用户和任务的消息。用户可以通过发送消息来提交、部署和管理他们的管道作业和依赖。运行中的任务可以通过发送消息来报告它们的运行状态。运行时消息也可以触发一些事件,这些事件可以在运行期间调度和恢复进程。

有向无环图调度器遍历管道的任务图,并将任务提交到相应的运行环境。一个任务会在它的所有父任务都被成功执行之后进入自己的执行调度期。

任务启动器为pipe启动执行进程。目前,Pipeline61使用了三种类型的任务启动器:

  • Spark启动器会初始化一个子进程,作为执行Spark作业的驱动进程。它会捕捉运行时状态的通知消息,并将通知发送给管道服务器,用于监控和调试。
  • MapReduce启动器会初始化一个子进程,用于提交由pipe指定的MapReduce作业。在将执行状态发送给管道服务器之前,子进程会等待作业执行完毕,不管是成功还是失败。
  • shell启动器会创建一系列进程通道,用于处理shell脚本或者由shell pipe所指定的命令。在这些进程结束或者任何一个进程失败之后,相关的状态消息将被发送给管道服务器。

开发人员可以实现新的任务启动器,用于支持新的运行上下文:

  • 可以使用由执行框架(比如Hadoop和Spark)提供的API
  • 在已经启动的进程里初始化子进程,并执行程序逻辑。

理论上,任何可以通过shell脚本启动的任务都可以使用进程启动器来执行。

数据服务

每个pipe在运行期间都是独立执行的。pipe根据输入路径和格式来读取和处理输入数据,并将输出结果写入指定的存储系统。管理各种IO数据的协议和格式是件枯燥的事情,而且容易出错。所以,数据服务为开发人员代劳了这些工作。

数据服务提供了一组数据解析器,它们根据给定的格式和协议在特定运行环境里读取和写入数据。例如,对于一个Spark pipe来说,数据服务使用原生的Spark API来加载文件本文到RDD对象,或者使用SparkSQL API从JDBC或JSON文件加载数据到Spark DataFrame。对于Python pipe来说,数据服务使用Python Hadoop API加载CSV文件的数据到HDFS,并转换成Python DataFrame。基本上,数据服务是将数据协议和格式映射到特定运行环境的数据解析器。

我们可以扩展数据服务,实现并注册新的数据解析器。一些数据解析工具,如Apache Tika,可以作为数据服务的补充实现。

依赖和版本管理器

对于管道管理员来说,管理和维护管道生命周期是一件很重要的事情,同时也很复杂。 为了解决管道管理方面存在的痛点,依赖和版本管理器可以帮助用户来维护、跟踪和分析管道数据和组件的历史信息。

依赖和版本管理器为每个管道维护了三种类型的信息。管道执行跟踪过程为管道应用程序的每一个运行实例维护了一个数据流图。每个图的节点都包含了实例组件的元数据,比如启动时间、结束时间和运行状态。

图4. 在Pipeline61中维护的历史和依赖信息,第一部分。管道执行跟踪过程为管道应用程序的每一个运行实例维护了一个数据流图。

管道依赖跟踪过程(图5a)为每个管道组件的不同版本维护着历史元数据。它将每个组件的依赖信息保存成树状结构。保存在树中的元数据包含了最近更新的名字、版本、作者、时间戳,以及运行依赖包。

图5. 在Pipeline61中维护的历史和依赖信息,第二部分。(a) 管道依赖跟踪过程为每个管道组件的不同版本维护着历史元数据。(b) 数据快照包含了管道应用程序每一个运行实例的输入输出位置和样本数据。

数据快照(图5b)包含了管道应用程序每一个运行实例的输入输出位置和样本数据。

Pipeline61用户可以通过这些历史信息来分析管道历史,并通过重新运行旧版本的管道来重新生成历史结果。

案例学习

以下的案例学习展示了Pipeline61的效率和优势。示例使用了来自不同组织的三种格式的数据源,包括CSV、文本和JSON。两组数据科学家使用少量手写的MapReduce和Python程序来对整体数据集进行分析。我们引入了我们的管道框架,用于自动执行管道任务和管道管理。图6展示了我们是如何在Pipeline61里指定管道的。

图6. 在Pipeline61里指定管道。在相关的案例学习里,两组数据科学家使用少量手写的MapReduce和Python程序来对整体数据集进行分析。

首先,我们指定了三种数据映射器——csvMapper、jsonMapper和textMapper——用于处理不同格式的输入数据。我们指定了三个MapReduce pipe,并将三种mapper分别作为数据解析器传递进去。

接下来,我们使用RDD函数DataJoinerProc指定了一个叫作dataJoiner的Spark pipe,用于组合三种mapper的输出结果。

最后,我们指定了两组分析pipe组件,从dataJoiner那里消费输出结果。因为每个分析分支关注不同的输入特征,我们为每个分析组件添加了一个特征抽取器。然后我们将这两个分析组件实现为Python pipe和Spark pipe。最后,我们使用连接操作将这些pipe连接在一起,组成了整体的数据流。

在这个场景里,如果使用现有的管道框架,比如Crunch和Cascading,那么开发人员需要重新实现所有的东西。这样做存在风险,也非常耗时。它不仅对重用已有的MapReduce、Python或shell脚本程序造成限制,而且也对数据分析框架(如IPython和R)的使用造成约束。

相反,Pipeline61专注于管理和管道化异构的管道组件,所以它可以显著地减少集成新旧数据处理组件所需要的投入。

管道后续的开发和更新也会从Pipeline61的版本和依赖管理中获得好处。例如,如果开发人员想要更新一个组件,他们可以从数据快照历史中获得组件最新的输入和输出样本。然后,他们基于样本数据实现和测试新的程序,确保新版本组件不会对管道造成破坏。

在将更新过的组件提交到生产环境之前,开发人员可以为新组件指定一个新的管道实例,并将它的输出结果与生产环境的版本进行比较,对正确性进行双重检查。除此之外,如果新组件在部署之后出现错误,管道管理器可以很容易地回滚到前一个版本。管道服务器自动维护着每个组件的历史数据和依赖,所以可以实现回滚。

这种DevOps风格的支持对于维护和管理管道应用程序来说是很有意义的,而现有的管道框架很少会提供这些支持。

不过Pipeline61也存在不足。它不检查各个数据处理框架数据结构的兼容性。到目前为止,开发人员在进行管道开发时,必须手动对每个pipe的输入和输出进行手动测试,确保一个pipe的输出可以作为下一个pipe的输入。为了解决这个问题,我们打算使用现有的结构匹配(schema-matching)技术。

当然,在管道运行期间,大部分中间结果需要被写到底层的物理数据存储(如HDFS)里,用于连接不同运行上下文的pipe,同时保证管道组件的可靠性。因此,Pipeline61的管道运行比其他框架要慢,因为其他框架独立运行在一个单独的环境中,不需要与外部系统集成。我们可以通过只保存重要的数据来解决这个问题。不过,这需要在可靠性和历史管理完整性之间做出权衡。


本文作者:Dongyao Wu Liming Zhu

来源:51CTO

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
343 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
52 2
|
29天前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
68 4
|
1月前
|
存储 大数据 数据管理
大数据分区简化数据维护
大数据分区简化数据维护
24 4
|
1月前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
58 3
|
1月前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
71 2
|
1月前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
115 2
|
2月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
1月前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
89 1
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3