使用Databricks+Confluent进行实时数据采集入湖和分析【Databricks 数据洞察公开课】

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍网约车模拟数据从产生,发布到流数据服务 Confluent,通过Databricks Structured Streaming进行实时数据处理,存储到LakeHouse,并使用spark和spark sql进行分析的应用实践。

作者:李锦桂   阿里云开源大数据平台开发工程师


本文将介绍使用 Python 脚本周期性地向 Kafka Brokers 发送数据,以模拟数据采集,数据发送频率约 1 万条/秒,并使用 DDI Spark Structured Streaming 连接 Kafka Brokers 消费采集到的实时数据,再将其存储到数据湖中。此外,我们还将使用Spark SQL 对采集到的数据做一些简单的处理、分析并进行展示。


1.png

 

上图最左侧为 IoT 设备和移动应用,负责采集设备或者应用的运行数据,发布至 Kafka Brokers。而后,即可在 DDI 中使用Spark Structured Streaming消费 Kafka 中的数据,对这些实时数据进行 ETL 等处理后,将结果存储到 OSS 数据湖,最终使用 BI Tools 或机器学习工具展示这些数据,或进行机器学习的训练。


2.png


本示例中模拟实时数据的数据集为纽约市出租车数据,存储在csv 文件内。数据中第一列是 key,为交易产生的时间,而后每一列分别为交易价格、乘客上车时间、上车的经纬度、下车的经纬度以及乘客的数量。


3.png


进入Confluent 管控台,Jinxi-DDI-test 为本次使用的 confluent 集群。

4.png


登录 Control Center,可以查看本集群的详细信息,比如 broker 数量、broker 产生和消费的吞吐量、topic 数量、partition 数量等信息。


5.png


创建一个用于接收数据的 topic ,名为nyc_taxi_data partition 数量设置为 3 ,与 broker 数量一致。


6.png


Python 脚本里需要提供集群 ID Control Center 用户名、密码以及 topic 名称。创建一个名为 conf dict ,指定 bootstrap server 的地址,此地址需要根据ClusterID 拼接而成,其他都为样板代码。


生成一个 producer,其来源于 confluent Kafka 库,因此用户需要安装此库。打开 train 文件,读取csv 文件里的数据并发送到 confluent 集群。


7.png


进入 Control Center ,可以看到已经开始生产消息。


8.png


进入 topic ,点击 message 可以直接跳转到 offset0,即最开始的消息,可以查看消息的生产情况以及具体产生了哪些消息。


此外,企业版 Kafka 能提供一些高级特性,比如定义消息的 schema connectksqlDB 等功能。


至此,数据的采集和发布链路已经打通,下一步需要到 DDI Notebook 连接到 Kafka Broker,并对其中的实时数据进行消费、进行 ETL 处理,最终对这些实时数据进行入湖操作。

9.png


准备好 Notebook

注意,此处需要再增加一个 Kafka connector jar包。创建数据库,使用数据库存储数据表。


按照 nyc_taxi_data schema 创建表,字段与train.csv 文件里一致,并使用delta 来存储。


10.png


此外还设置了表的属性,主要与 DDI 引擎的特性相关。第一个特性为对 OSS 的写入操作做合并,从而防止小文件的产生,避免元数据不可扩展以及查询速度下降;第二个特性为定期将小文件进行合并,以避免查询性能下降以及元数据的扩展出现问题。


11.png


连接到 confluent 集群进行消费。消费逻辑与发送逻辑较为相近,需要给定 confluent Cluster ID serverusernamepassword以及topic为了安全起见,需要将密码存储至环境变量中,它会随着作业运行结束而消失。Checkpoint_location 会记录消费 Kafka 数据时的 offset ,如果作业中断或异常停止,重启后可以从对应的 offset 继续消费。


使用 Spark Kafka connector 读取 Kafka 中数据。数据为 key-value 的形式,存储在 value 里,因此需要创建一列 data ,负责 value 里取值。 value 为二进制形式,需要先将其转换成 string ,形成 JSON 串,而后通过 select 将里面的数据读取出来,并写入到data 表。


12.png


运行代码,启动程序。


Select 若干数据进行查看。在 OSS browser 里可以看到注入的数据以及过程中产生的 checkpoint 信息。


13.png

 

刷新后可以看到,数据已经注入到 datalake 中。


接下来使用 Spark SQL 做一些简单的数据分析和可视化展示。


首先统计每个月的交易量。


14.png


从交易量的走势图可以看出, 3、4、5、6月的交易量较高,8月达到低谷。


15.png


统计在每周内从周一到周日的交易量变化,如上图。

16.png


统计一天中每个时间段的交易量,如上图。可以看出每天中午和下午为交易量的低谷。

17.png


统计每个时间段打车的费用,如上图。可以看出中午打车费用较高。因此可以得出结论:由于打车价格攀升导致了交易量萎靡。


18.png


统计每个年份打车费用的均值,如上图左,显示为打车费用连年上升。再统计每个年份的交易量,如上图右,可以得出结论:交易量和打车费用成反比关系。

19.png


统计交易金额的分布,如上图。可以看出,纽约市打车价格主要分布在 3-20 美元之间。

在以上查询分析的过程中,流式处理作业一直处于运行状态,与批式作业没有冲突,可以并行运行。


以上内容模拟了实时数据的产生、采集,并将其发布到 Kafka Brokers 中,之后使用 DDI 消费 Kafka Brokers 中的数据,实现了实时数据入湖,并且使用 DDI Zeppelin 基于入湖的实时数据做一些简单分析。



产品技术咨询

https://survey.aliyun.com/apps/zhiliao/VArMPrZOR  

加入技术交流群

image.png

相关文章
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
35 4
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
54 1
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
73 0
|
2月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
70 3
|
3月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
70 2
|
3月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
56 1
|
3月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
3月前
|
机器学习/深度学习 数据采集 分布式计算
【颠覆传统!】揭秘Databricks如何助力零售业需求预测——从数据到洞察,一秒钟变销售预言家!
【8月更文挑战第9天】随着大数据技术的发展,数据驱动决策日益关键,尤其在零售业中,通过分析历史销售数据预测未来趋势变得至关重要。本文探讨如何运用Databricks平台优化零售业需求预测。Databricks是一个基于Apache Spark的统一数据分析平台,能高效处理大规模数据任务。通过示例代码展示数据读取、预处理及建模过程,相较于传统方法,Databricks在数据处理能力、可扩展性、内置机器学习库以及协作版本控制方面展现出显著优势,帮助零售商优化库存管理、提升客户体验并增加销售额。
87 8
|
存储 分布式计算 数据挖掘
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake
【数据湖仓架构】数据湖和仓库:Databricks 和 Snowflake