使用Spark Streaming SQL进行PV/UV统计

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor


1.背景介绍

PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。
使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

2.准备工作

  • 创建E-MapReduce 3.23.0以上版本的Hadoop集群。
  • 下载并编译E-MapReduce-SDK包
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests

编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

  • 数据源

本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务

3.统计PV/UV

一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

3.1启动客户端

命令行启动streaming-sql客户端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

3.1定义数据表

数据源表定义如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP) 
USING loghub 
OPTIONS(
sls.project=${sls.project},
sls.store=${sls.store},
access.key.id=${access.key.id},
access.key.secret=${access.key.secret},
endpoint=${endpoint});

其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。
结果表定义如下

CREATE TABLE redis_sink 
USING redis 
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='user_ip');

其中,user_ip对应数据中的用户IP字段,配置项${redis_host}的值根据实际配置。

3.2创建流作业

CREATE SCAN loghub_scan
ON loghub_source
USING STREAM
OPTIONS(
watermark.column='__time__',
watermark.delayThreshold='10 second');
CREATE STREAM job
OPTIONS(
checkpointLocation=${checkpoint_location})
INSERT INTO redis_sink
SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3查看统计结果

最终的统计结果如下图所示

1

可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

3.4实现覆盖更新

将结果表的配置项key.column修改为一个固定的值,例如定义如下

CREATE TABLE redis_sink
USING redis 
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='statistic_type');

创建流作业的SQL改为

CREATE STREAM job
OPTIONS(
checkpointLocation='/tmp/spark-test/checkpoint')
INSERT INTO redis_sink
SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;

最终的统计结果如下图所示

2

可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

4.总结

本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
二维码.JPG

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
115 56
|
12天前
|
分布式计算 监控 数据处理
Spark Streaming:解锁实时数据处理的力量
【7月更文挑战第15天】Spark Streaming作为Spark框架的一个重要组成部分,为实时数据处理提供了高效、可扩展的解决方案。通过其微批处理的工作模式和强大的集成性、容错性特性,Spark Streaming能够轻松应对各种复杂的实时数据处理场景。然而,在实际应用中,我们还需要根据具体需求和资源情况进行合理的部署和优化,以确保系统的稳定性和高效性。
|
10天前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么编写和执行Spark SQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL BI HIVE
【Hive SQL 每日一题】统计用户留存率
用户留存率是衡量产品成功的关键指标,表示用户在特定时间内持续使用产品的比例。计算公式为留存用户数除以初始用户数。例如,游戏发行后第一天有10000玩家,第七天剩5000人,第一周留存率为50%。提供的SQL代码展示了如何根据用户活动数据统计每天的留存率。需求包括计算系统上线后的每日留存率,以及从第一天开始的累计N日留存率。通过窗口函数`LAG`和`COUNT(DISTINCT user_id)`,可以有效地分析用户留存趋势。
|
2月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
75 2
|
1月前
|
SQL JSON 分布式计算
|
1月前
|
SQL 分布式计算 Java
|
2月前
|
SQL HIVE
【Hive SQL 每日一题】统计用户连续下单的日期区间
该SQL代码用于统计用户连续下单的日期区间。首先按`user_id`和`order_date`分组并去除重复,然后使用`row_number()`标记行号,并通过`date_sub`与行号计算潜在的连续日期。接着按用户ID和计算后的日期分组,排除连续订单数少于2的情况,最后提取连续下单的起始和结束日期。输出结果展示了用户连续下单的日期范围。
|
2月前
|
SQL 关系型数据库 HIVE
【Hive SQL 每日一题】统计最近1天/7天/30天商品的销量
这段内容是关于SQL查询的示例,目标是统计`sales`表中最近1天、7天和30天的商品销量和销售次数。表结构包含`id`、`product_id`、`quantity`和`sale_date`字段。初始查询方法通过三个独立的子查询完成,但效率较低。优化后的查询使用了`lateral view explode`将数据炸裂,通过一次查询同时获取所有所需时间段的数据,提高了效率。示例中展示了优化前后的SQL代码及结果对比。