使用 Databricks+Confluent 进行实时数据采集入湖和分析| 学习笔记

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 快速学习使用 Databricks+Confluent 进行实时数据采集入湖和分析

开发者学堂课程【Databricks数据洞察公开课使用 Databricks+Confluent 进行实时数据采集入湖和分析】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/1058/detail/15339


使用 Databricks+Confluent 进行实时数据采集入湖和分析

 

内容介绍:

一、   数据采集

二、   入湖

三、   分析

四、   总结

 

本节课主要介绍如何使用 DDI Confluent 进行实时数据采集、入湖和分析

 

一、数据采集


image.png

那在这张架构图中,在最左边可以看到一些 IoT 设备和移动应用,这些 IoT 设备和移动应用会采集设备或者应用的一些运行数据,再发布到 Kafka Broker中,发布完后,就可以在 DDI 中使用Spark Structured Streaming去消费 Kafka的数据,对这些实施的数据去做 ETL 的处理,再把处理的结果存储到 OSS 数据湖中,就可以使用 BI Tools 或者是机器学习的工具去展示这些数据,或者做一些机器学习的训练。

本次课程中会使用了一些 Python 的脚本,来周期性的去向 Kafka Broker 发送数据,以模拟数据的采集,数据的发送频率,大概是1万条每秒,就可以使用 DDI The Spark Structured Streaming ,去连接 Kafka Broker 去消费采集到的这些实时的数据再将这些实时的数据存储到 lake 数据湖中,此外,对采集到的数据做了简单的处理和分析,使用的是 Spark circle,对数据湖中的数据做简单的分析和简单的展示。

下图的数据集用来模拟实时数据,产生和采集的数据集在 Kaggle 上下载。

image.png

这个数据集是纽约市出租车的数据,数据集存储到 csv 文件里,第一列是它产生的时间也就是它的 key ,第二列是它的价格,就是这一次出租车交易的价格,第三列是乘客上车的时间,第第四列和第五列是上车的位置,上车的经纬度。

然后第六列和第七列是下车的经纬度,最后一列是乘客的数量,就要用这样一个数据集去模拟数据的产生。

先进入到 Confluent 的管控台,需要使用到的这个 Confluent 已经创建好了:Jinxi-DDI-test

image.png

在发送消息的时候需要使用到集群的 ID ab9b2deb067846e3

image.png

登录到 Confluent control center ,用户名和密码呢是在创建集群的时候设置好的,在 control center里会有这个 Confluent 集群的详细的信息,比如 Broker 的数量,和 Broker 的吞吐量,产生和消费的吞吐量,还有 Topic 的数量和 Partition 的数量等信息,先需要在 topic 这里去创建一个 topic

image.png

用来接收发布到这里的数据。

topic 的名称设置为 nyc_taxi_data partation 的数量设置成 3

image.png

topic 创建之后创建好之后,就可以往 topic 发布数据了,在发布数据的时候是需要用到集群 ID 的,再回到 Python 的脚本,在 Python 里需要提供集群的 ID control center username password ,为了安全性,把 password 存到另外一个 py 文件里,用户需要填充在control center username 下方,如下图:再看 topic 的名称。

image.png

创建 conf dct ,在 dct 里,需要指定 bootstrap.servers 地址,地址是根据 casp IP 拼接得到的,其他都是样本代码,之后会生成 producer producer 是来源于 Confluent kafka 库,用户是需要去安装这个库的,已经安装好之后

用户可以使用 conda 或者 python pip 进行安装。

image.png

之后把 True 文件打开,去一条一条的把 csv 文件里的数据读取出来,然后发送到 Confluent 集群里,如下图。

image.png

已经在生产消息了后,到 Confluent control center 去,可以发现已经在生产消息。

image.png


再进入 topic 里,点击 message ,直接跳到 offset 0最开始的消息,在 control center 就可以看到消息的一个生产情况以及具体产生了哪些消息。

还可以去定义消息的 Scheme,企业版 kafka ,能提供的高级特性,除了这个 Scheme registry 之外

image.png

企业版的 Confluent 还提供了connect ,还有 ksqIDB 的功能。

image.png

目前数据的采集和发布的路已经走通了,模拟数据的采集过程,并且把数据发布到kafka Broker上,下一步需要去 DDI  notebook 连接到 kafka Broker上,对kafka Broker 中的实时数据进行消费,进行ETL 的处理之后,把这些实时的数据做入湖的操作。

 

二、入湖


notebook 准备好,需要注意的是就是需要在这里增加一个 kafka Connect 的假包,如下图。

image.png

然后创建数据库,使用这个数据库去存储数据表。

按照之前的那个 taxi data schema去创建一个表,和之前在创建 csv 文件里的表是一致的。

image.png

首先 key是时间戳, fare_amount是打车的价格,之后就是上车的时间,上车的经纬度,下车的经纬度,还有乘客数量,是使用 delta 来存储的,使用 DDI 后,应该对这一个格式比较熟悉了,此外还设置了表的属性,这两个属性就是 DDI 引擎一些特性,第一个特性,会优化写,可把 OSS 的写入操作做合并,从而能防止小文件的产生。

image.png

OutCompact 因为流失处理会产生大量的小文件,这些小文件,如果不对它进行合并操作,会导致 OSS 中产生大量的小文件,然后导致元数据也变得非常大,一方面元数据会变得不可扩展,另一方面也会导致查询速度下降,outCompact会定期的把小文件合并,从而避免查询性能的下降以及元数据的扩展问题。

此后就可以连接到 Confluent 集群进行消费,这里消费的逻辑与发送的逻辑比较相近,需要给定 class idConfluent server username password   password为了安全起见,存到一个环境变量里,环境变量只要作业运行结束,它就消失了。

Topic 也是在 control center 去创建的,checkpoint 是在消费 kafka 数据的时候,会去记录消费的 offset ,及时地去记录,如果作业中断了或者异常停止,还可以继续重启这个作业后,可以继续从对应的 offse 去消费, Data Lake 就是存储数据的地方,之后就可以使用Spark Kafka connect

image.png

去读取 Kafka 中的数据,读取过来之后,数据是 KV 队的一个形式的,数据是存储在 value 里的,所以在这里创建了一列称为 data data 从这个 value 里面去取值, value 是一个二进制形式,需要先把它转成 string ,转成后是 json 的串,然后把里面的数据都给 select 出来, select 出来后就把它写入到 data 表中,把它运行起来运行起来后,简单的 select 几条数据出来:Select * from taxi_record limit 10;

OSS browser 里, ingest 过来的数据,还有 ingest 的过程中产生的这些,这个 checkpoint 的信息。数据已经 ingest过来,应该是在慢慢增加的。

image.png

刷新后可以以看到数据已经 ingest 到了 lake 中了,我们使用 spark circle 去做简单的数据分析和可视化的展示。

image.png

三、分析


现在统计每个月的交易量,从这张交易量走势图可以看出,在3 4 5 6月,交易量比较高,在8月会达到一个交易量的低谷。

image.png

再来统计在每个周内,从周一到周日的交易量的变化,那从这张图可以看出周二是交易量的低谷,周二后会有一个慢慢的攀升一个过程,到周六、周日会达到一个交易量的高峰,周一到周二又降到了低谷。

image.png

统计一天中每个时间段的交易量,从这张图可以看出就是每天的11点,每天中午和下午是交易量的低谷,会造成这样销量低谷的原因,是否与交易价格有关系,所以来看每个时间段打车的费用。

image.png

从这张图可以看出在中午打车的费用比较高,所以和上一张图的结论吻合,是因为打车价格攀升,导致交易量的萎靡。

image.png

来统计每个年份打车费用的均值,从上图可以看出打车的费用连年上升。

image.png

从上图可以看出每个年份的交易量和打车的费用是成反比的关系,价格的攀升导致了交易量的下滑。

image.png

从上图看教育金额的分布,出租车打车价格是主要是分布在3美元到20美元之间。

在以上的查询分析的过程中,流式处理作业是一直处于运行状态,也就是意味着流式处理作业的运行和下方批式处理作业没有冲突,可以并行运行。


四、总结


模拟了实时数据的产生、采集,并且把这些数据发布到了 Kafka Broker 中,之后使用了 DDI 消费 kafka Broker 中的数据实现了实时的数据入湖,并且使用了DDI Zeplin ,基于入湖的实时数据,做了简单的分析。

相关文章
|
17天前
|
消息中间件 存储 网络协议
【Kafka】Kafka 性能高的原因分析
【4月更文挑战第5天】【Kafka】Kafka 性能高的原因分析
|
2月前
|
消息中间件 存储 缓存
玩转Kafka—Kafka高性能原因分析
玩转Kafka—Kafka高性能原因分析
27 0
|
11天前
|
消息中间件 存储 Kafka
【Kafka】Replica、Leader 和 Follower 三者的概念分析
【4月更文挑战第11天】【Kafka】Replica、Leader 和 Follower 三者的概念分析
|
13天前
|
机器学习/深度学习 人工智能 安全
Azure Databricks实战:在云上轻松进行大数据分析与AI开发
【4月更文挑战第8天】Databricks在大数据分析和AI开发中表现出色,简化流程并提高效率。文中列举了三个应用场景:数据湖分析、实时流处理和AI机器学习,并阐述了Databricks的一体化平台、云原生弹性及企业级安全优势。博主认为,Databricks提升了研发效能,无缝集成Azure生态,并具有持续创新潜力,是应对大数据挑战和加速AI创新的理想工具。
36 0
|
15天前
|
消息中间件 存储 负载均衡
【Kafka】Kafka 的分区分配策略分析
【4月更文挑战第7天】【Kafka】Kafka 的分区分配策略分析
|
2月前
|
消息中间件 Java Kafka
【Kafka】Kafka-Server-start.sh 启动脚本分析(Ver 2.7.2)
【Kafka】Kafka-Server-start.sh 启动脚本分析(Ver 2.7.2)
33 0
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
63 0
|
4月前
|
分布式计算 BI 双11
StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)
StructredStreaming+Kafka+Mysql(Spark实时计算| 天猫双十一实时报表分析)
39 0
|
4月前
|
消息中间件 Kafka Shell
Linux【脚本 02】shell脚本离线安装配置Zookeeper及Kafka并添加service服务和开机启动(脚本分析)
Linux【脚本 02】shell脚本离线安装配置Zookeeper及Kafka并添加service服务和开机启动(脚本分析)
47 0
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
69 0