使用Databricks+Confluent进行实时数据采集入湖和分析【Databricks 数据洞察公开课】

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍网约车模拟数据从产生,发布到流数据服务 Confluent,通过Databricks Structured Streaming进行实时数据处理,存储到LakeHouse,并使用spark和spark sql进行分析的应用实践。

作者:李锦桂   阿里云开源大数据平台开发工程师


本文将介绍使用 Python 脚本周期性地向 Kafka Brokers 发送数据,以模拟数据采集,数据发送频率约 1 万条/秒,并使用 DDI Spark Structured Streaming 连接 Kafka Brokers 消费采集到的实时数据,再将其存储到数据湖中。此外,我们还将使用Spark SQL 对采集到的数据做一些简单的处理、分析并进行展示。


1.png

 

上图最左侧为 IoT 设备和移动应用,负责采集设备或者应用的运行数据,发布至 Kafka Brokers。而后,即可在 DDI 中使用Spark Structured Streaming消费 Kafka 中的数据,对这些实时数据进行 ETL 等处理后,将结果存储到 OSS 数据湖,最终使用 BI Tools 或机器学习工具展示这些数据,或进行机器学习的训练。


2.png


本示例中模拟实时数据的数据集为纽约市出租车数据,存储在csv 文件内。数据中第一列是 key,为交易产生的时间,而后每一列分别为交易价格、乘客上车时间、上车的经纬度、下车的经纬度以及乘客的数量。


3.png


进入Confluent 管控台,Jinxi-DDI-test 为本次使用的 confluent 集群。

4.png


登录 Control Center,可以查看本集群的详细信息,比如 broker 数量、broker 产生和消费的吞吐量、topic 数量、partition 数量等信息。


5.png


创建一个用于接收数据的 topic ,名为nyc_taxi_data partition 数量设置为 3 ,与 broker 数量一致。


6.png


Python 脚本里需要提供集群 ID Control Center 用户名、密码以及 topic 名称。创建一个名为 conf dict ,指定 bootstrap server 的地址,此地址需要根据ClusterID 拼接而成,其他都为样板代码。


生成一个 producer,其来源于 confluent Kafka 库,因此用户需要安装此库。打开 train 文件,读取csv 文件里的数据并发送到 confluent 集群。


7.png


进入 Control Center ,可以看到已经开始生产消息。


8.png


进入 topic ,点击 message 可以直接跳转到 offset0,即最开始的消息,可以查看消息的生产情况以及具体产生了哪些消息。


此外,企业版 Kafka 能提供一些高级特性,比如定义消息的 schema connectksqlDB 等功能。


至此,数据的采集和发布链路已经打通,下一步需要到 DDI Notebook 连接到 Kafka Broker,并对其中的实时数据进行消费、进行 ETL 处理,最终对这些实时数据进行入湖操作。

9.png


准备好 Notebook

注意,此处需要再增加一个 Kafka connector jar包。创建数据库,使用数据库存储数据表。


按照 nyc_taxi_data schema 创建表,字段与train.csv 文件里一致,并使用delta 来存储。


10.png


此外还设置了表的属性,主要与 DDI 引擎的特性相关。第一个特性为对 OSS 的写入操作做合并,从而防止小文件的产生,避免元数据不可扩展以及查询速度下降;第二个特性为定期将小文件进行合并,以避免查询性能下降以及元数据的扩展出现问题。


11.png


连接到 confluent 集群进行消费。消费逻辑与发送逻辑较为相近,需要给定 confluent Cluster ID serverusernamepassword以及topic为了安全起见,需要将密码存储至环境变量中,它会随着作业运行结束而消失。Checkpoint_location 会记录消费 Kafka 数据时的 offset ,如果作业中断或异常停止,重启后可以从对应的 offset 继续消费。


使用 Spark Kafka connector 读取 Kafka 中数据。数据为 key-value 的形式,存储在 value 里,因此需要创建一列 data ,负责 value 里取值。 value 为二进制形式,需要先将其转换成 string ,形成 JSON 串,而后通过 select 将里面的数据读取出来,并写入到data 表。


12.png


运行代码,启动程序。


Select 若干数据进行查看。在 OSS browser 里可以看到注入的数据以及过程中产生的 checkpoint 信息。


13.png

 

刷新后可以看到,数据已经注入到 datalake 中。


接下来使用 Spark SQL 做一些简单的数据分析和可视化展示。


首先统计每个月的交易量。


14.png


从交易量的走势图可以看出, 3、4、5、6月的交易量较高,8月达到低谷。


15.png


统计在每周内从周一到周日的交易量变化,如上图。

16.png


统计一天中每个时间段的交易量,如上图。可以看出每天中午和下午为交易量的低谷。

17.png


统计每个时间段打车的费用,如上图。可以看出中午打车费用较高。因此可以得出结论:由于打车价格攀升导致了交易量萎靡。


18.png


统计每个年份打车费用的均值,如上图左,显示为打车费用连年上升。再统计每个年份的交易量,如上图右,可以得出结论:交易量和打车费用成反比关系。

19.png


统计交易金额的分布,如上图。可以看出,纽约市打车价格主要分布在 3-20 美元之间。

在以上查询分析的过程中,流式处理作业一直处于运行状态,与批式作业没有冲突,可以并行运行。


以上内容模拟了实时数据的产生、采集,并将其发布到 Kafka Brokers 中,之后使用 DDI 消费 Kafka Brokers 中的数据,实现了实时数据入湖,并且使用 DDI Zeppelin 基于入湖的实时数据做一些简单分析。



产品技术咨询

https://survey.aliyun.com/apps/zhiliao/VArMPrZOR  

加入技术交流群

image.png

相关文章
|
3月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
77 3
|
4月前
|
机器学习/深度学习 数据采集 分布式计算
【颠覆传统!】揭秘Databricks如何助力零售业需求预测——从数据到洞察,一秒钟变销售预言家!
【8月更文挑战第9天】随着大数据技术的发展,数据驱动决策日益关键,尤其在零售业中,通过分析历史销售数据预测未来趋势变得至关重要。本文探讨如何运用Databricks平台优化零售业需求预测。Databricks是一个基于Apache Spark的统一数据分析平台,能高效处理大规模数据任务。通过示例代码展示数据读取、预处理及建模过程,相较于传统方法,Databricks在数据处理能力、可扩展性、内置机器学习库以及协作版本控制方面展现出显著优势,帮助零售商优化库存管理、提升客户体验并增加销售额。
102 8
|
4月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
65 1
|
4月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【8月更文挑战第9天】利用Databricks与Confluent打造实时数据处理方案。Confluent的Kafka负责数据采集,通过主题接收IoT及应用数据;Databricks运用Structured Streaming处理Kafka数据,并以Delta Lake存储,支持ACID事务。这套组合实现了从数据采集、存储到分析的全流程自动化,满足企业对大数据实时处理的需求。
52 3
|
7月前
|
消息中间件 数据采集 分布式计算
【数据采集与预处理】数据接入工具Kafka
【数据采集与预处理】数据接入工具Kafka
109 1
【数据采集与预处理】数据接入工具Kafka
|
7月前
|
人工智能 分布式计算 安全
Azure Databricks实战:在云上轻松进行大数据分析与AI开发
【4月更文挑战第9天】探索Microsoft Azure的Databricks服务,体验其在大数据分析和AI开发中的高效性能。此平台简化流程,提升效率,适用场景包括数据湖分析、实时流处理和AI开发。核心优势在于一体化平台设计、云原生的弹性伸缩和企业级安全保障。Databricks提升研发效能,无缝集成Azure生态,且持续创新,是应对大数据挑战和加速AI创新的理想工具。
693 1
|
7月前
|
机器学习/深度学习 人工智能 安全
Azure Databricks实战:在云上轻松进行大数据分析与AI开发
【4月更文挑战第8天】Databricks在大数据分析和AI开发中表现出色,简化流程并提高效率。文中列举了三个应用场景:数据湖分析、实时流处理和AI机器学习,并阐述了Databricks的一体化平台、云原生弹性及企业级安全优势。博主认为,Databricks提升了研发效能,无缝集成Azure生态,并具有持续创新潜力,是应对大数据挑战和加速AI创新的理想工具。
690 1
|
消息中间件 数据采集 存储
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Kafka Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的Kafka Channel,讲解其数据采集流程。
205 0
|
消息中间件 数据采集 关系型数据库
logstash集成kafka,mysql实现数据采集
logstash是一个非常灵活好用的数据采集框架工具,可以通过简单的配置满足绝大多数数据采集场景的需求。 采集数据一个非常典型的场景就是将数据先放到kafka队列里削峰,然后从kafka队列里读取数据到mysql或其他存储系统中进行保存。
252 0
|
存储 分布式计算 数据挖掘
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake