开发者学堂课程【Databricks数据洞察公开课:使用 Databricks+Confluent 进行实时数据采集入湖和分析】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1058/detail/15339
使用 Databricks+Confluent 进行实时数据采集入湖和分析
内容介绍:
一、 数据采集
二、 入湖
三、 分析
四、 总结
本节课主要介绍如何使用 DDI 和 Confluent 进行实时数据采集、入湖和分析
一、数据采集
那在这张架构图中,在最左边可以看到一些 IoT 设备和移动应用,这些 IoT 设备和移动应用会采集设备或者应用的一些运行数据,再发布到 Kafka 的 Broker中,发布完后,就可以在 DDI 中使用Spark Structured Streaming去消费 Kafka的数据,对这些实施的数据去做 ETL 的处理,再把处理的结果存储到 OSS 数据湖中,就可以使用 BI Tools 或者是机器学习的工具去展示这些数据,或者做一些机器学习的训练。
本次课程中会使用了一些 Python 的脚本,来周期性的去向 Kafka的 Broker 发送数据,以模拟数据的采集,数据的发送频率,大概是1万条每秒,就可以使用 DDI The Spark Structured Streaming ,去连接 Kafka 的 Broker 去消费采集到的这些实时的数据再将这些实时的数据存储到 lake 数据湖中,此外,对采集到的数据做了简单的处理和分析,使用的是 Spark circle,对数据湖中的数据做简单的分析和简单的展示。
下图的数据集用来模拟实时数据,产生和采集的数据集在 Kaggle 上下载。
这个数据集是纽约市出租车的数据,数据集存储到 csv 文件里,第一列是它产生的时间也就是它的 key ,第二列是它的价格,就是这一次出租车交易的价格,第三列是乘客上车的时间,第第四列和第五列是上车的位置,上车的经纬度。
然后第六列和第七列是下车的经纬度,最后一列是乘客的数量,就要用这样一个数据集去模拟数据的产生。
先进入到 Confluent 的管控台,需要使用到的这个 Confluent 已经创建好了:Jinxi-DDI-test
在发送消息的时候需要使用到集群的 ID :ab9b2deb067846e3
登录到 Confluent的 control center ,用户名和密码呢是在创建集群的时候设置好的,在 control center里会有这个 Confluent 集群的详细的信息,比如 Broker 的数量,和 Broker 的吞吐量,产生和消费的吞吐量,还有 Topic 的数量和 Partition 的数量等信息,先需要在 topic 这里去创建一个 topic 。
用来接收发布到这里的数据。
topic 的名称设置为 nyc_taxi_data ,partation 的数量设置成 3 。
那 topic 创建之后创建好之后,就可以往 topic 发布数据了,在发布数据的时候是需要用到集群 ID 的,再回到 Python 的脚本,在 Python 里需要提供集群的 ID ,control center username 和 password ,为了安全性,把 password 存到另外一个 py 文件里,用户需要填充在control center username 下方,如下图:再看 topic 的名称。
创建 conf 的 dct ,在 dct 里,需要指定 bootstrap.servers 地址,地址是根据 casp IP 拼接得到的,其他都是样本代码,之后会生成 producer, producer 是来源于 Confluent kafka 库,用户是需要去安装这个库的,已经安装好之后
用户可以使用 conda 或者 python pip 进行安装。
之后把 True 文件打开,去一条一条的把 csv 文件里的数据读取出来,然后发送到 Confluent 集群里,如下图。
已经在生产消息了后,到 Confluent 的 control center 去,可以发现已经在生产消息。
再进入 topic 里,点击 message ,直接跳到 offset 0最开始的消息,在 control center 就可以看到消息的一个生产情况以及具体产生了哪些消息。
还可以去定义消息的 Scheme,企业版 kafka ,能提供的高级特性,除了这个 Scheme registry 之外
企业版的 Confluent 还提供了connect ,还有 ksqIDB 的功能。
目前数据的采集和发布的路已经走通了,模拟数据的采集过程,并且把数据发布到kafka 的 Broker上,下一步需要去 DDI notebook 连接到 kafka 的 Broker上,对kafka 的 Broker 中的实时数据进行消费,进行ETL 的处理之后,把这些实时的数据做入湖的操作。
二、入湖
notebook 准备好,需要注意的是就是需要在这里增加一个 kafka 的 Connect 的假包,如下图。
然后创建数据库,使用这个数据库去存储数据表。
按照之前的那个 taxi data 的 schema去创建一个表,和之前在创建 csv 文件里的表是一致的。
首先 key是时间戳, fare_amount是打车的价格,之后就是上车的时间,上车的经纬度,下车的经纬度,还有乘客数量,是使用 delta 来存储的,使用 DDI 后,应该对这一个格式比较熟悉了,此外还设置了表的属性,这两个属性就是 DDI 引擎一些特性,第一个特性,会优化写,可把 OSS 的写入操作做合并,从而能防止小文件的产生。
OutCompact 因为流失处理会产生大量的小文件,这些小文件,如果不对它进行合并操作,会导致 OSS 中产生大量的小文件,然后导致元数据也变得非常大,一方面元数据会变得不可扩展,另一方面也会导致查询速度下降,outCompact会定期的把小文件合并,从而避免查询性能的下降以及元数据的扩展问题。
此后就可以连接到 Confluent 集群进行消费,这里消费的逻辑与发送的逻辑比较相近,需要给定 class id,Confluent server 、 username 、 password , password为了安全起见,存到一个环境变量里,环境变量只要作业运行结束,它就消失了。
Topic 也是在 control center 去创建的,checkpoint 是在消费 kafka 数据的时候,会去记录消费的 offset ,及时地去记录,如果作业中断了或者异常停止,还可以继续重启这个作业后,可以继续从对应的 offse 去消费, Data Lake 就是存储数据的地方,之后就可以使用Spark 到 Kafka 的connect 。
去读取 Kafka 中的数据,读取过来之后,数据是 KV 队的一个形式的,数据是存储在 value 里的,所以在这里创建了一列称为 data , data 从这个 value 里面去取值, value 是一个二进制形式,需要先把它转成 string ,转成后是 json 的串,然后把里面的数据都给 select 出来, select 出来后就把它写入到 data 表中,把它运行起来运行起来后,简单的 select 几条数据出来:Select * from taxi_record limit 10;
在 OSS 的 browser 里, ingest 过来的数据,还有 ingest 的过程中产生的这些,这个 checkpoint 的信息。数据已经 ingest过来,应该是在慢慢增加的。
刷新后可以以看到数据已经 ingest 到了 lake 中了,我们使用 spark circle 去做简单的数据分析和可视化的展示。
三、分析
现在统计每个月的交易量,从这张交易量走势图可以看出,在3 4 5 6月,交易量比较高,在8月会达到一个交易量的低谷。
再来统计在每个周内,从周一到周日的交易量的变化,那从这张图可以看出周二是交易量的低谷,周二后会有一个慢慢的攀升一个过程,到周六、周日会达到一个交易量的高峰,周一到周二又降到了低谷。
统计一天中每个时间段的交易量,从这张图可以看出就是每天的11点,每天中午和下午是交易量的低谷,会造成这样销量低谷的原因,是否与交易价格有关系,所以来看每个时间段打车的费用。
从这张图可以看出在中午打车的费用比较高,所以和上一张图的结论吻合,是因为打车价格攀升,导致交易量的萎靡。
来统计每个年份打车费用的均值,从上图可以看出打车的费用连年上升。
从上图可以看出每个年份的交易量和打车的费用是成反比的关系,价格的攀升导致了交易量的下滑。
从上图看教育金额的分布,出租车打车价格是主要是分布在3美元到20美元之间。
在以上的查询分析的过程中,流式处理作业是一直处于运行状态,也就是意味着流式处理作业的运行和下方批式处理作业没有冲突,可以并行运行。
四、总结
模拟了实时数据的产生、采集,并且把这些数据发布到了 Kafka 的 Broker 中,之后使用了 DDI 消费 kafka , Broker 中的数据实现了实时的数据入湖,并且使用了DDI 的 Zeplin ,基于入湖的实时数据,做了简单的分析。