使用 Databricks+Confluent 进行实时数据采集入湖和分析| 学习笔记

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 快速学习使用 Databricks+Confluent 进行实时数据采集入湖和分析

开发者学堂课程【Databricks数据洞察公开课使用 Databricks+Confluent 进行实时数据采集入湖和分析】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/1058/detail/15339


使用 Databricks+Confluent 进行实时数据采集入湖和分析

 

内容介绍:

一、   数据采集

二、   入湖

三、   分析

四、   总结

 

本节课主要介绍如何使用 DDI Confluent 进行实时数据采集、入湖和分析

 

一、数据采集


image.png

那在这张架构图中,在最左边可以看到一些 IoT 设备和移动应用,这些 IoT 设备和移动应用会采集设备或者应用的一些运行数据,再发布到 Kafka Broker中,发布完后,就可以在 DDI 中使用Spark Structured Streaming去消费 Kafka的数据,对这些实施的数据去做 ETL 的处理,再把处理的结果存储到 OSS 数据湖中,就可以使用 BI Tools 或者是机器学习的工具去展示这些数据,或者做一些机器学习的训练。

本次课程中会使用了一些 Python 的脚本,来周期性的去向 Kafka Broker 发送数据,以模拟数据的采集,数据的发送频率,大概是1万条每秒,就可以使用 DDI The Spark Structured Streaming ,去连接 Kafka Broker 去消费采集到的这些实时的数据再将这些实时的数据存储到 lake 数据湖中,此外,对采集到的数据做了简单的处理和分析,使用的是 Spark circle,对数据湖中的数据做简单的分析和简单的展示。

下图的数据集用来模拟实时数据,产生和采集的数据集在 Kaggle 上下载。

image.png

这个数据集是纽约市出租车的数据,数据集存储到 csv 文件里,第一列是它产生的时间也就是它的 key ,第二列是它的价格,就是这一次出租车交易的价格,第三列是乘客上车的时间,第第四列和第五列是上车的位置,上车的经纬度。

然后第六列和第七列是下车的经纬度,最后一列是乘客的数量,就要用这样一个数据集去模拟数据的产生。

先进入到 Confluent 的管控台,需要使用到的这个 Confluent 已经创建好了:Jinxi-DDI-test

image.png

在发送消息的时候需要使用到集群的 ID ab9b2deb067846e3

image.png

登录到 Confluent control center ,用户名和密码呢是在创建集群的时候设置好的,在 control center里会有这个 Confluent 集群的详细的信息,比如 Broker 的数量,和 Broker 的吞吐量,产生和消费的吞吐量,还有 Topic 的数量和 Partition 的数量等信息,先需要在 topic 这里去创建一个 topic

image.png

用来接收发布到这里的数据。

topic 的名称设置为 nyc_taxi_data partation 的数量设置成 3

image.png

topic 创建之后创建好之后,就可以往 topic 发布数据了,在发布数据的时候是需要用到集群 ID 的,再回到 Python 的脚本,在 Python 里需要提供集群的 ID control center username password ,为了安全性,把 password 存到另外一个 py 文件里,用户需要填充在control center username 下方,如下图:再看 topic 的名称。

image.png

创建 conf dct ,在 dct 里,需要指定 bootstrap.servers 地址,地址是根据 casp IP 拼接得到的,其他都是样本代码,之后会生成 producer producer 是来源于 Confluent kafka 库,用户是需要去安装这个库的,已经安装好之后

用户可以使用 conda 或者 python pip 进行安装。

image.png

之后把 True 文件打开,去一条一条的把 csv 文件里的数据读取出来,然后发送到 Confluent 集群里,如下图。

image.png

已经在生产消息了后,到 Confluent control center 去,可以发现已经在生产消息。

image.png


再进入 topic 里,点击 message ,直接跳到 offset 0最开始的消息,在 control center 就可以看到消息的一个生产情况以及具体产生了哪些消息。

还可以去定义消息的 Scheme,企业版 kafka ,能提供的高级特性,除了这个 Scheme registry 之外

image.png

企业版的 Confluent 还提供了connect ,还有 ksqIDB 的功能。

image.png

目前数据的采集和发布的路已经走通了,模拟数据的采集过程,并且把数据发布到kafka Broker上,下一步需要去 DDI  notebook 连接到 kafka Broker上,对kafka Broker 中的实时数据进行消费,进行ETL 的处理之后,把这些实时的数据做入湖的操作。

 

二、入湖


notebook 准备好,需要注意的是就是需要在这里增加一个 kafka Connect 的假包,如下图。

image.png

然后创建数据库,使用这个数据库去存储数据表。

按照之前的那个 taxi data schema去创建一个表,和之前在创建 csv 文件里的表是一致的。

image.png

首先 key是时间戳, fare_amount是打车的价格,之后就是上车的时间,上车的经纬度,下车的经纬度,还有乘客数量,是使用 delta 来存储的,使用 DDI 后,应该对这一个格式比较熟悉了,此外还设置了表的属性,这两个属性就是 DDI 引擎一些特性,第一个特性,会优化写,可把 OSS 的写入操作做合并,从而能防止小文件的产生。

image.png

OutCompact 因为流失处理会产生大量的小文件,这些小文件,如果不对它进行合并操作,会导致 OSS 中产生大量的小文件,然后导致元数据也变得非常大,一方面元数据会变得不可扩展,另一方面也会导致查询速度下降,outCompact会定期的把小文件合并,从而避免查询性能的下降以及元数据的扩展问题。

此后就可以连接到 Confluent 集群进行消费,这里消费的逻辑与发送的逻辑比较相近,需要给定 class idConfluent server username password   password为了安全起见,存到一个环境变量里,环境变量只要作业运行结束,它就消失了。

Topic 也是在 control center 去创建的,checkpoint 是在消费 kafka 数据的时候,会去记录消费的 offset ,及时地去记录,如果作业中断了或者异常停止,还可以继续重启这个作业后,可以继续从对应的 offse 去消费, Data Lake 就是存储数据的地方,之后就可以使用Spark Kafka connect

image.png

去读取 Kafka 中的数据,读取过来之后,数据是 KV 队的一个形式的,数据是存储在 value 里的,所以在这里创建了一列称为 data data 从这个 value 里面去取值, value 是一个二进制形式,需要先把它转成 string ,转成后是 json 的串,然后把里面的数据都给 select 出来, select 出来后就把它写入到 data 表中,把它运行起来运行起来后,简单的 select 几条数据出来:Select * from taxi_record limit 10;

OSS browser 里, ingest 过来的数据,还有 ingest 的过程中产生的这些,这个 checkpoint 的信息。数据已经 ingest过来,应该是在慢慢增加的。

image.png

刷新后可以以看到数据已经 ingest 到了 lake 中了,我们使用 spark circle 去做简单的数据分析和可视化的展示。

image.png

三、分析


现在统计每个月的交易量,从这张交易量走势图可以看出,在3 4 5 6月,交易量比较高,在8月会达到一个交易量的低谷。

image.png

再来统计在每个周内,从周一到周日的交易量的变化,那从这张图可以看出周二是交易量的低谷,周二后会有一个慢慢的攀升一个过程,到周六、周日会达到一个交易量的高峰,周一到周二又降到了低谷。

image.png

统计一天中每个时间段的交易量,从这张图可以看出就是每天的11点,每天中午和下午是交易量的低谷,会造成这样销量低谷的原因,是否与交易价格有关系,所以来看每个时间段打车的费用。

image.png

从这张图可以看出在中午打车的费用比较高,所以和上一张图的结论吻合,是因为打车价格攀升,导致交易量的萎靡。

image.png

来统计每个年份打车费用的均值,从上图可以看出打车的费用连年上升。

image.png

从上图可以看出每个年份的交易量和打车的费用是成反比的关系,价格的攀升导致了交易量的下滑。

image.png

从上图看教育金额的分布,出租车打车价格是主要是分布在3美元到20美元之间。

在以上的查询分析的过程中,流式处理作业是一直处于运行状态,也就是意味着流式处理作业的运行和下方批式处理作业没有冲突,可以并行运行。


四、总结


模拟了实时数据的产生、采集,并且把这些数据发布到了 Kafka Broker 中,之后使用了 DDI 消费 kafka Broker 中的数据实现了实时的数据入湖,并且使用了DDI Zeplin ,基于入湖的实时数据,做了简单的分析。

相关文章
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
35 4
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
52 1
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
72 0
|
2月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
69 3
|
3月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
69 2
|
3月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
3月前
|
机器学习/深度学习 数据采集 分布式计算
【颠覆传统!】揭秘Databricks如何助力零售业需求预测——从数据到洞察,一秒钟变销售预言家!
【8月更文挑战第9天】随着大数据技术的发展,数据驱动决策日益关键,尤其在零售业中,通过分析历史销售数据预测未来趋势变得至关重要。本文探讨如何运用Databricks平台优化零售业需求预测。Databricks是一个基于Apache Spark的统一数据分析平台,能高效处理大规模数据任务。通过示例代码展示数据读取、预处理及建模过程,相较于传统方法,Databricks在数据处理能力、可扩展性、内置机器学习库以及协作版本控制方面展现出显著优势,帮助零售商优化库存管理、提升客户体验并增加销售额。
87 8
|
存储 分布式计算 数据挖掘
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake
|
存储 SQL 分布式计算
【数据湖仓】数据湖和仓库:Databricks 和 Snowflake
【数据湖仓】数据湖和仓库:Databricks 和 Snowflake