开发者学堂课程【Cassandra数据库入门与实战:数据湖大数据处理之Lambda架构】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/784
数据湖大数据处理之Lambda架构
内容简介:
一、大数据 数据湖和Lambda架构简介
二、Apache cassandra, Spark和pulsar的lambda实现
三、示例程序介绍
四、实例
一、大数据 数据湖和Lambda架构简介
以下是大数据的5v定义
James Dixon是第一个提出数据湖概念的人。他们对数据湖和自然的由水构成的湖类比。
当讨论数据服务的时候,有两个层面的意思。第一个是静态的含义,仅仅是指一个企业的所有数据的集合,反应了大数据的每个方面,例如数据来源的多样性,数据格式的多样性,巨量的数据以及快速的数据生成等,以上是典型的大数据的特性。
最重要的一点,数据湖也是一个较为动态从端到端的处理框架。
例如从原始数据开始,到技能数据的处理,将其存储到企业的中心数据湖里做进一步的分析处理,产生的商业决策信息可以供下游系统利用。
例如数据可视化,商业智能,数据科学,人工智能,深度学习。从整个数据的生成,到最后数据被系统利用也是企业数据湖组成的关键部分。
快数据更多的在于整个数据的分析处理提取,最后生成数据决策信息的整个过程。传统的数据仓在当下的商业环境下已经完全不能适用。
当下商业环境不仅强调数据要快,而且要大。当把传统的处理数据方式和当下的处理数据方式整合在一起来处理大数据的环境有不同的数据架构,lambda架构是较为通用的架构。
Lambda架构主要分为三个层面,批处理层,速处理层,服务层。
批处理层和传统的批处理架构基本相似,速处理层相当于当下实用的流处理,对实时数据的分析,处理,挖掘,但教育批处理层速处理层不能较大范围的处理历史数据,所以速处理层更多的是分析最近的数据。
服务层包括许多数据视图,这些数据视图可以从批处理层来,也可以从实时处理层来。
数据视图生成的目的是为了加快速度,这些数据视图相当于已经被处理完成,不必再做多余的计算,只需要简单的查询来获取所需要的信息。
整个lambda结构可以在企业数据湖环境里解决高容错低延迟问题,帮助数据进行线性扩展,当有新的技术和服务出现时可以被轻易容纳。
二、Apache cassandra, Spark和pulsar的lambda实现
Apache Cassandra
作为数据存储层,主要应对批处理层和服务处理层。
分布式,非主从结构,宽列,开源NoSQL数据库,因为其没有主从结构,所以扩展性极强。
而且每个数据会有多个备份,意味着如果一个主数据断层,还有其他备份数据来满足要求。如果系统维护好,可以做到零当机。Cassandra对于多数据中心的无缝数据复制,这是很多数据库没有的功能。Cassandra是能和云进行配合的数据库,可以轻松支持混合及多云的部署。
Apache spark
定义spark为一种用于大规模数据处理的统一分析引擎。和hadoop性能相似,也是用于大规模数据处理的分析。但是haddoop本质上是一个批处理模式,无法对实时的数据进行处理,需要利用其他技术来辅助其分析。它并不是一个真正意义上的数据库,所以每次进行读写时,会有较多磁盘读写操作,由此可见其性能不够理想。但可以将spark理解为一个内存化的hadoop,显而易见的优势就是其比hadoop快速。
同时spark可以处理不同类型的数据,也有自己的流处理模块,但不是一个实际的流数据处理。Spark还支持机器学习,图形数据库,SQL语言。由此可见,spark是一个性能非常强大的分析引擎。
Apache Kafka or Pulsar
从消息处理和流处理的角度来分析kafka和pulsar 相似,它都是一个大规模,分布式消息流数据处理平台。但是pulsar的优势在于其计算层和数据储存层是分开的,带来的优点是可以更好的系统扩充和负载均衡。可以分开的扩容计算层和存储层,计算层是无状态的,所以当扩展计算层时,对系统影响很小,而且会极其快速。同时也会更好的实现负载均衡。
Pulsar技术支持无缝跨数据中心数据复制,这也是kafka没有拥有的功能。Pulsar还拥有更灵活的消息处理模式,例如发布订阅,消息队列,混合模式。
以下将介绍如何使用三个技术来实现lambda架构。
在批处理层用cassandra原始数据的主数据库,在速处理层用pulsar来支持接收源数据流,在批处理层,有定时的spark job,Spark job从原始数据库里读取数据进行分析处理,把生成的数据放入到服务层的数据库里,服务层数据库就代表批处理数据视图。在速处理层,实时的数据流进入之后。
可以定义pulsar函数,Pulsar是对实时数据进行处理的手段,可以对数据进行过滤,集成。通过pulsar可以把新的数据放到另一个pulsar的主题里。通过pulsar和cassandra的连接器写进服从的相关的视图里。作为终端用户可以查询数据。
以下为lambda架构需要用到的技术栈和实现平台
实现平台用到的是dse,因为可以用一个dse集群来实现Cassandra和spark的所有功能。
三、示例程序介绍
示例程序的应用场景是极简化的石油钻头探测器时序数据模拟。以下为时序程序中所包含的数据内容包括钻头id,传感器id,传感器类型及传感器的读取时间,读取的值等。
应用场景就是每个钻探地点有多个钻探头,每个钻探头有两种传感器,一种是温度,另一种是速度,每个传感器每隔一段时间(一秒或一分钟)采集一次数据。需要回答的商业问题,例需要监控每个钻头的健康状况,防止其过热或速度过快。从批处理视图的角度来看,需要记录每个钻头每日的平均温度和转速,每天一次批处理。想从诉处理视图的角度来说,记录最近一天之内出现钻头过热或速度过快的情况,实时进行处理。
演示程序里所需部件有负载生成器,负载加载器,每日spark汇总程序,pulsar消息发布器,parser函数,pulse 摄氏度连接器。
负载生成器是按照所需的原始数据格式,生成一系列的模拟数据,生成的数据放在一个csv格式的文件中。可以控制钻头数量及传感器数量或采集数据的间隔时间。从批处理层面来讲,把生成的数据加载到原始数据库里生成一张cassandra表。
由每日的spark汇总程序对主数据库中的原始数据每天进行汇总,将汇总好的数据写入批处理视图数据库中,位于服务层。Pulsar消息发布器负责将深沉的原始数据以消息的方式实时发送到pulsar主题中。当发送到pulsar主题后,就会有pulsar函数对原始数据信息流进行实时数据流处理,将处理过的数据写入新的pulsar主题中。新生成的pulsar主题对应的是服务层中速处理视图。新的pulsar主题到数据库是由pulsar cassandra连接器将处理过的实时数据自动写入速处理视图数据库中的
以下为cassandra数据表模型
主要有三个cassandra键空间,分别对应于原始数据主数据库表,批数据视图数据库表。速处理层视图数据库表。
Pulsa主题如下:
主要有两个,第一个主题主要对应原始钻头传感器数据以信息的形式发布到主题中;另一个主题对应流处理过的感应器预警数据以信息的形式发布到这个主题中。
例如过滤掉旧数据,过滤掉一些正常运转的钻头是否过速或过热并不是该主题所关注的内容,只关注预警钻头的信息。
整个数据处理流程概况:
第一步,预处理。预处理包括创建cassandra键空间和表(cassandra cql表);生成模拟的工作负载文件(负载生成器)。
第二步,数据加载。在批处理层中,将源数据加载到原始数据主数据库中(负载加载器);在速处理层中,将源数据以信息的方式实时发送到pulsar主题中(Pulsa主题发布器)。
第三步,数据处理,视图数据生成。在批处理层中,将每日源数据批处理写入批处理视图(每日spark汇总程序);在速处理层中将当日数据实时处理,写入速处理视图(pulsar函数,pulsar cassandra连接器)。
第四步,数据分析。终端用户可以使用spark sql来访问不同数据。从不同的数据视图中来访问所需要的信息。
将所有cassandra被删除,确保没有所需要的键空间。第一步,创建cassandra所需要的键空间和表,能发现的是,每个键空间的数据表里是没有数据的,都是zero date。
生成模拟数据利用java程序制作,编译运行此程序,可以发现产生数据数量。数据都是关于钻头的信息及模拟数据生成频率。负载加载器是把源数据加载到了批处理层的数据库里。此时返回可以得到关于传感器的值及钻头信息,而另外两个数据库仍然没有任何数据。
第二步,运行spark job,相当于对主数据库的数据进行了一次汇总,按照每天每个钻头平均速度和温度显示。
运行之后,可以看到数据被显示出来。每个数据对应的是每个钻头在某一天的平均转速和平均温度,此时批处理层的模拟数据已经运行完毕。速处理层运用pulsar,部署一个pulsar函数到function里,同时sync负责把从函数里生成的数据写入到服务层数据库中。之后继续运行一个message发布器,message发布器,会把原始数据发布到第一个pulsar数据主题中,第一个数据会被function进行数据处理,处理好的数据会被放到第二个pulsar数据主题中。第二个数据中有主题会Sync写入到下游数据库中。这些数据都属于内部,无法被显示出来,所以此时无法显示数据。
发布原始数据时,经过整个处理过程可以看到速处理层的原始数据里面会有数据。下一步回到速处理视图里会发现已经开始显示数据,且显示出来的只是最近时间段中数值极大的数据,最开始的历史数据已经被function自动过滤。由此看来,从整个端到端的数据流程已经处理完毕。
四、实例
根据官方文档,如何使用spark来连接源数据库cassandra实例。
首先需要具备的前提条件是已经开通对象存储oss;拥有个人数据库cassandra实例;获取cassandra实例的私网连接点。Cql接口。数据库用户名,数据库密码,这些信息会被用到连接cassandra实例;需要创建数据表并植入数据,用来验证spark可以正确的连接到数据表里读取数据;需要为dl a spark访问cassandra实例所需要的安全锁id和交换机id。安全锁id和交换机id主要应用于dla spark这些访问实例中,需要挂在虚拟网卡上才可以访问cassandra实例。创建虚拟网卡必须使用安全锁id和交换机id,由此,用户必须提供安全锁id和交换机id。然后查看ip所在的网段(可通过专有网络控制台查看),接着修改白名单。这样Cassandra实例才能被正常访问,不被防火墙阻挡。查找完安全锁id后就可以开始处理。开始书写代码时需要插入查找到的连接cassandra实例的信息。代码主要功能就是写表和读表。将把代码打包上传到Oss路径下后进入dra spark的控制台,必须确保代码含有以上所述数据。所有的id都是用于配置创建虚拟网卡访问cassandra实例所需要的。点击运行后,即可实现spark与cassandra连接。通过日志即可了解spark已经连接成功Cassandra实力。
以下为基本步骤:
第一步开通oss存储;第二部拥有cassandra实例;第三步,从cassandra控制台获取连接cassandra实例所需要的信息,例如用户名,密码及其连接的信息;第四步,准备一张测试表,用于连接cassandra的测试。第五步,网络打通,关键要具备安全锁id和交换机id用于创建spark的虚拟网卡。拥有虚拟网卡且保证安全所没有被限制后即可用于访问