Flink 实例:电商用户行为实时分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 实例:电商用户行为实时分析

image.png


【示例 1】综合运用 Flink 的各种 API,基于 EventTime 实现分析电商用户行为。

电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。

电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、单击和浏览页面、页面停留时间及页面跳转等,可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从 web 服务器日志中直接读取,而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,一般会在业务系统中相应的位置埋点,然后收集日志进行分析。业务行为数据又可以简单分为两类:一类是能够明显地表现出用户兴趣的行为,例如对商品的收藏、喜欢、评分和评价,可以从中对数据进行深入分析,得到用户画像,进而对用户给出个性化的推荐商品列表,这个过程往往会用到机器学习相关的算法;另一类则是常规的业务操作,但需要着重关注一些异常状况以做好风控,例如登录和订单支付。

本项目限于数据,只实现实时热门商品统计。
# 01、数据源说明
本案例使用阿里天池的一份淘宝用户行为数据集,格式为 csv 文件。本数据集包含了 2023 年 1 月 25 日至 2023 年 5 月 3 日之间,约一百万随机用户的所有行为(行为包括单击、购买、加购、喜欢)。数据集的每行表示一条用户行为,由用户 ID、商品 ID、商品类目 ID、行为类型和时间戳组成,并以逗号分隔。

注意/



从天池上下载的数据集 UserBehavior.csv 解压缩后为 3.41G,这里截取其中一部分(485730 行)用于开发测试,命名为 UserBehavior_part.csv。

关于数据集中每列的详细描述,见表 1。

■ 表 1 淘宝用户行为数据集说明


image.png


其中用户行为类型共有 4 种,它们分别是:

(1) pv:商品详情页 pv,等价于单击。

(2) buy:商品购买。

(3) cart:将商品加入购物车。

(4) fav:收藏商品。

部分用户行为数据示例如下:
js 543462,1715,1464116,pv,1511658000 662867,2244074,1575622,pv,1511658000 561558,3611281,965809,pv,1511658000 894923,3076029,1879194,pv,1511658000 834377,4541270,3738615,pv,1511658000 ...
# 02、预备知识
Flink SQL 提供了如下一些与日期处理有关的函数。

## 1)TO_TIMESTAMP
将 BIGINT 类型的日期或者 VARCHAR 类型的日期转换成 TIMESTAMP 类型,其语法如下:
js TIMESTAMP TO_TIMESTAMP(BIGINT time) //time:毫秒 TIMESTAMP TO_TIMESTAMP(VARCHAR date) //date:yyyy-MM-dd HH:mm:ss TIMESTAMP TO_TIMESTAMP(VARCHAR date, VARCHAR format)
## 2)FROM_UNIXTIME
返回值为 VARCHAR 类型的日期值,默认日期格式:yyyy-MM-dd HH:mm:ss,若指定日期格式则按指定格式输出。如果任一输入参数是 NULL,返回 NULL,其语法如下:
js VARCHAR FROM_UNIXTIME(BIGINT UNIXtime[, VARCHAR format])
说明/



(1) 参数 UNIXtime 为长整型,是以秒为单位的时间戳。

(2) 参数 format 可选,为日期格式,默认格式为 yyyy-MM-dd HH:mm:ss,表示返回 VARCHAR 类型的符合指定格式的日期,如果有参数为 null 或解析错误,则返回 null。

# 03、任务实现


(1) 将用户行为数据采集到 Kafka。



(2) 使用 Table API 读取 Kafka 并写入 MySQL。



(3)用 Grafana 实时可视化显示。



首先,项目添加依赖。因为要读取的 Kafka 的用户行为事件是 CSV 格式的,所以在项目的 pom.xml 文件中添加如下的依赖:
js <!-- 需要添加flink-csv依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.13.2</version> </dependency>
创建 Kafka 源表的 SQL 语句如下:
js CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime AS PROCTIME(), -- 使用计算列生成处理时间属性 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 定义ts列上的水印, 将ts标记为事件时间属性 ) WITH ( 'connector' = 'kafka', -- 使用kafka连接器 'topic' = 'user_behavior', -- kafka topic 'scan.startup.mode' = 'earliest-offset', -- 从头开始读取 'properties.Bootstrap.servers' = 'kafka:9094', -- kafka broker 地址 'format' = 'json' -- 数据格式 );
在上面的 SQL 语句中,按照数据的格式声明了 5 个字段,除此之外,还通过计算列语法和 PROCTIME() 内置函数声明了一个产生处理时间的虚拟列。另外通过 WATERMARK 语法,在 ts 字段上声明了 watermark 策略(容忍 5s 乱序),ts 字段因此也成了事件时间列。

接下来,编写流处理代码。

Scala 代码实现:
js import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object UserBehaviorDemo { def main(args: Array[String]) { //设置流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //创建表环境 val tEnv = StreamTableEnvironment.create(env) //启用检查点//启用检查点,并设置并行度为1 env.enableCheckpointing(5000).setParallelism(1) //创建Kafka源表 tEnv.executeSql( """ |CREATE TABLE user_behavior ( | user_id bigint, | item_id bigint, | category_id bigint, | behavior string, | behavior_time bigint, | ts AS TO_TIMESTAMP(FROM_UNIXTIME(behavior_time, 'yyyy-MM-dd HH:mm:ss')), | proctime AS PROCTIME(), | WATERMARK FOR ts AS ts - INTERVAL '5' SECONDS |) |with( | 'connector'='kafka', | 'topic'='user_behavior', | 'properties.Bootstrap.servers'='localhost:9092', | 'properties.group.id'='testGroup', | 'scan.startup.mode'='latest-offset', | 'format'='csv', | 'csv.ignore-parse-errors' = 'true', | 'csv.field-delimiter'=',' |) """.stripMargin) //创建mysql sink表 tEnv.executeSql( """ |CREATE TABLE buy_cnt_per_hour ( | hour_of_day TIMESTAMP(3), | buy_cnt BIGINT |) |with( | 'connector'='JDBC', | 'url'='JDBC:mysql://localhost:3306/xueai8?useSSL=false', | 'table-name'='buy_cnt_per_hour', | 'driver'='com.mysql.JDBC.Driver', | 'username'='root', | 'password'='admin' |) """.stripMargin) tEnv //读取源表 .from("user_behavior") //定义大小为5s,滑动为2s 的滑动窗口 .window(Tumble over 1.minute on $"ts" as $"w") //分组 .groupBy($"w") //聚合 .select($"w".start.as("hour_of_day"),$"item_id".count.as("buy_cnt")) //.execute.print() //写入sink表 .executeInsert("buy_cnt_per_hour") } }
Java 代码如下:
js import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.util.TimeUtils; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit; public class UserBehaviorDemo { public static void main(String[] args) throws Exception { //设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建表环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //启用检查点,并设置并行度为1 env.enableCheckpointing(1000).setParallelism(1); //创建Kafka源表 tEnv.executeSql( "CREATE TABLE user_behavior (" + " user_id bigint," + " item_id bigint," + " category_id bigint," + " behavior string," + " behavior_time bigint," + " ts AS TO_TIMESTAMP(FROM_UNIXTIME(behavior_time, 'yyyy-MM-dd HH:mm:ss'))," + " proctime AS PROCTIME()," + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" + ")" + "WITH (" + " 'connector'='kafka'," + " 'topic'='user_behavior'," + " 'properties.Bootstrap.servers'='192.168.190.133:9092'," + " 'properties.group.id'='testGroup'," + " 'scan.startup.mode'='latest-offset'," + " 'format'='csv'," + " 'csv.ignore-parse-errors' = 'true'," + " 'csv.field-delimiter'=','" + ")" ); //创建mysql sink表 tEnv.executeSql("CREATE TABLE buy_cnt_per_hour (" + " hour_of_day TIMESTAMP(3)," + " buy_cnt BIGINT" + ") WITH (" + " 'connector' = 'JDBC'," + " 'url' = 'JDBC:mysql://localhost:3306/xueai8?useSSL=false'," + " 'table-name' = 'buy_cnt_per_hour'," + " 'driver' = 'com.mysql.JDBC.Driver'," + " 'username' = 'root'," + " 'password' = 'admin'" + ")" ); //统计每分钟的成交量 /* 统计每小时的成交量就是每小时共有多少 "buy" 的用户行为。 因此会需要用到 TUMBLE 窗口函数,按照一小时切窗。 ,然后每个窗口分别统计 "buy" 的个数,这可以通过先过滤出 "buy" 的数据,然后 COUNT(*) 实现。 SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR); */ /* 在MySQL数据库中创建相应的结果表: CREATE TABLE xueai8.buy_cnt_per_hour ( hour_of_day timestamp, buy_cnt bigint ) */ tEnv //读取源表 .from("user_behavior") //过滤出购买行为 .filter($("behavior").isEqual("buy")) //定义滚动窗口 .window(Tumble.over(lit(1).minute()).on($("ts")).as("w")) //分组 .groupBy($("w")) //聚合 .select($("w").start().as("hour_of_day"),$("item_id").count().as("buy_cnt")) //使用.extract(TimeIntervalUnit.MINUTE)抽取日期中的指定部分 .select($("w").start().extract(TimeIntervalUnit.MINUTE).as("hour_of_day"),$("item_id").count().as("buy_cnt")) .execute().collect().forEachRemaining(System.out::println); //写入sink表 .executeInsert("buy_cnt_per_hour"); } }
# 04、执行过程
请按以下步骤执行程序。

(1) 启动 Kafka。

首先打开一个终端,运行 ZooKeeper,命令如下:
js $ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
另打开一个终端,运行 Kafka 服务器,命令如下:

$ ./bin/kafka-server-start.sh ./config/server.properties


再打开一个终端,创建名为 user_behavior 的 Kafka 主题,命令如下:

$./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_behavior


查看已经存在的主题,命令如下:

$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181


(2) 在 MySQL 上创建接收表,SQL 语句如下:

CREATE TABLE buy_cnt_per_hour (
hour_of_day timestamp,
buy_cnt bigint
);


(3) 运行可视化终端 Grafana。

Grafana 是一款用 Go 语言开发的开源数据可视化工具,可以做数据监控和数据统计,带有告警功能。

首先安装 Grafana,安装步骤如下:

① 下载并安装包,然后解压缩即可。

② 启动。在命令行启动 Grafana 服务器,命令如下:
js E:\BigData\Grafana\grafana-7.5.0\bin>grafana-server.exe
③ 然后打开浏览器,访问地址 http://localhost:3000/。

④ 在 Grafana 中要先配置好数据源,指向 MySQL 中的接收表,如图 1 所示。

image.png


■图 1 在 Grafana 中要先配置好数据源

⑤ 在 Grafana 中创建 dashboard,查询获得数据,使用的 SQL 语句如下:

SELECT
UNIX_TIMESTAMP(hour_of_day) as time_sec,
     buy_cnt as value
FROM xueai8.buy_cnt_per_hour

为了更好地看到动态变化的效果,设置一个仪表板显示的时间范围,如图 2 所示。

image.png


■ 图 2 在 Grafana 中设置一个仪表板显示的时间范围

(4) 运行 Flink 流程序。

(5) 执行数据生产者脚本 streamuserbehavior.sh。它调用 Kafka 自带的生产者脚本,以每秒 10 条的速度将数据发送给 Kafka 的 user_behavior 主题。编辑生产数据的脚本文件 streamuserbehavior.sh,代码如下:

#!/bin/bash
BROKER=$1
if [ -z "$1" ]; then
        BROKER="localhost:9092"
fi

cat UserBehavior_part.csv | while read line; do
        echo $line
        sleep 0.1
done | ~/bigdata/kafka_2.11-2.4.1/bin/kafka-console-producer.sh --broker-list $BROKER --topic user_behavior

执行该脚本,使用的命令如下

$ ./streamuserbehavior.sh

(6) 观察 Grafana 中数据实时呈现效果,如图 3 所示。

image.png


■ 图 3 用户购买量实时统计结果显示

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
3月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
91 5
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
307 2
|
2月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
463 4
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
3月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
100 0
|
5月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
98 1
|
5月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
6月前
|
canal 监控 关系型数据库
实时计算 Flink版产品使用问题之如何在实例里配置监控哪些库,哪些表,包括黑名单,白名单
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
监控 关系型数据库 MySQL
Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)
Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)
124 1
|
7月前
|
消息中间件 分布式计算 Kafka
深度分析:Apache Flink及其在大数据处理中的应用
Apache Flink是低延迟、高吞吐量的流处理框架,以其状态管理和事件时间处理能力脱颖而出。与Apache Spark Streaming相比,Flink在实时性上更强,但Spark生态系统更丰富。Apache Storm在低延迟上有优势,而Kafka Streams适合轻量级流处理。选型考虑延迟、状态管理、生态系统和运维成本。Flink适用于实时数据分析、复杂事件处理等场景,使用时注意资源配置、状态管理和窗口操作的优化。