作者:李锦桂 阿里云开源大数据平台开发工程师
本文将介绍使用 Python 脚本周期性地向 Kafka Brokers 发送数据,以模拟数据采集,数据发送频率约 1 万条/秒,并使用 DDI Spark Structured Streaming 连接 Kafka Brokers 消费采集到的实时数据,再将其存储到数据湖中。此外,我们还将使用Spark SQL 对采集到的数据做一些简单的处理、分析并进行展示。
上图最左侧为 IoT 设备和移动应用,负责采集设备或者应用的运行数据,发布至 Kafka Brokers。而后,即可在 DDI 中使用Spark Structured Streaming消费 Kafka 中的数据,对这些实时数据进行 ETL 等处理后,将结果存储到 OSS 数据湖,最终使用 BI Tools 或机器学习工具展示这些数据,或进行机器学习的训练。
本示例中模拟实时数据的数据集为纽约市出租车数据,存储在csv 文件内。数据中第一列是 key,为交易产生的时间,而后每一列分别为交易价格、乘客上车时间、上车的经纬度、下车的经纬度以及乘客的数量。
进入Confluent 管控台,Jinxi-DDI-test 为本次使用的 confluent 集群。
登录 Control Center,可以查看本集群的详细信息,比如 broker 数量、broker 产生和消费的吞吐量、topic 数量、partition 数量等信息。
创建一个用于接收数据的 topic ,名为nyc_taxi_data ,partition 数量设置为 3 ,与 broker 数量一致。
Python 脚本里需要提供集群 ID ,Control Center 的用户名、密码以及 topic 名称。创建一个名为 conf 的 dict ,指定 bootstrap server 的地址,此地址需要根据ClusterID 拼接而成,其他都为样板代码。
生成一个 producer,其来源于 confluent Kafka 库,因此用户需要安装此库。打开 train 文件,读取csv 文件里的数据并发送到 confluent 集群。
进入 Control Center ,可以看到已经开始生产消息。
进入 topic ,点击 message ,可以直接跳转到 offset0,即最开始的消息,可以查看消息的生产情况以及具体产生了哪些消息。
此外,企业版 Kafka 能提供一些高级特性,比如定义消息的 schema 、connect、ksqlDB 等功能。
至此,数据的采集和发布链路已经打通,下一步需要到 DDI Notebook 连接到 Kafka Broker,并对其中的实时数据进行消费、进行 ETL 处理,最终对这些实时数据进行入湖操作。
准备好 Notebook。
注意,此处需要再增加一个 Kafka connector jar包。创建数据库,使用数据库存储数据表。
按照 nyc_taxi_data 的 schema 创建表,字段与train.csv 文件里一致,并使用delta 来存储。
此外还设置了表的属性,主要与 DDI 引擎的特性相关。第一个特性为对 OSS 的写入操作做合并,从而防止小文件的产生,避免元数据不可扩展以及查询速度下降;第二个特性为定期将小文件进行合并,以避免查询性能下降以及元数据的扩展出现问题。
连接到 confluent 集群进行消费。消费逻辑与发送逻辑较为相近,需要给定 confluent 的 Cluster ID 、server、username、password以及topic。为了安全起见,需要将密码存储至环境变量中,它会随着作业运行结束而消失。Checkpoint_location 会记录消费 Kafka 数据时的 offset ,如果作业中断或异常停止,重启后可以从对应的 offset 继续消费。
使用 Spark 到 Kafka 的 connector 读取 Kafka 中数据。数据为 key-value 的形式,存储在 value 里,因此需要创建一列 data ,负责从 value 里取值。 value 为二进制形式,需要先将其转换成 string ,形成 JSON 串,而后通过 select 将里面的数据读取出来,并写入到data 表。
运行代码,启动程序。
Select 若干数据进行查看。在 OSS 的 browser 里可以看到注入的数据以及过程中产生的 checkpoint 信息。
刷新后可以看到,数据已经注入到 datalake 中。
接下来使用 Spark SQL 做一些简单的数据分析和可视化展示。
首先统计每个月的交易量。
从交易量的走势图可以看出, 3、4、5、6月的交易量较高,8月达到低谷。
统计在每周内从周一到周日的交易量变化,如上图。
统计一天中每个时间段的交易量,如上图。可以看出每天中午和下午为交易量的低谷。
统计每个时间段打车的费用,如上图。可以看出中午打车费用较高。因此可以得出结论:由于打车价格攀升导致了交易量萎靡。
统计每个年份打车费用的均值,如上图左,显示为打车费用连年上升。再统计每个年份的交易量,如上图右,可以得出结论:交易量和打车费用成反比关系。
统计交易金额的分布,如上图。可以看出,纽约市打车价格主要分布在 3-20 美元之间。
在以上查询分析的过程中,流式处理作业一直处于运行状态,与批式作业没有冲突,可以并行运行。
以上内容模拟了实时数据的产生、采集,并将其发布到 Kafka Brokers 中,之后使用 DDI 消费 Kafka Brokers 中的数据,实现了实时数据入湖,并且使用 DDI 的Zeppelin 基于入湖的实时数据做一些简单分析。
产品技术咨询
https://survey.aliyun.com/apps/zhiliao/VArMPrZOR
加入技术交流群