使用Spark Streaming SQL进行PV/UV统计

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor


1.背景介绍

PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。
使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

2.准备工作

  • 创建E-MapReduce 3.23.0以上版本的Hadoop集群。
  • 下载并编译E-MapReduce-SDK包
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests

编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

  • 数据源

本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务

3.统计PV/UV

一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

3.1启动客户端

命令行启动streaming-sql客户端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

3.1定义数据表

数据源表定义如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP) 
USING loghub 
OPTIONS(
sls.project=${sls.project},
sls.store=${sls.store},
access.key.id=${access.key.id},
access.key.secret=${access.key.secret},
endpoint=${endpoint});

其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。
结果表定义如下

CREATE TABLE redis_sink 
USING redis 
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='user_ip');

其中,user_ip对应数据中的用户IP字段,配置项${redis_host}的值根据实际配置。

3.2创建流作业

CREATE SCAN loghub_scan
ON loghub_source
USING STREAM
OPTIONS(
watermark.column='__time__',
watermark.delayThreshold='10 second');
CREATE STREAM job
OPTIONS(
checkpointLocation=${checkpoint_location})
INSERT INTO redis_sink
SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3查看统计结果

最终的统计结果如下图所示

1

可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

3.4实现覆盖更新

将结果表的配置项key.column修改为一个固定的值,例如定义如下

CREATE TABLE redis_sink
USING redis 
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='statistic_type');

创建流作业的SQL改为

CREATE STREAM job
OPTIONS(
checkpointLocation='/tmp/spark-test/checkpoint')
INSERT INTO redis_sink
SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;

最终的统计结果如下图所示

2

可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

4.总结

本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码.JPG

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
48 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
103 0
|
1月前
|
SQL
开启慢SQL设置long_query_time=0.1为啥会统计的sql却存在小于100毫秒的sql
开启慢SQL设置long_query_time=0.1为啥会统计的sql却存在小于100毫秒的sql
35 1
|
1月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 存储 关系型数据库
mysql 数据库空间统计sql
mysql 数据库空间统计sql
50 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
61 0
|
2月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
55 0
|
2月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
57 0
|
2月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
41 0