Flink计算pv和uv的通用方法

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。计算网站App的实时pv和uv,是很常见的统计需求,这里提供通用的计算方法,不同的业务需求只需要小改即可拿来即用。

PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。


UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。


计算网站App的实时pv和uv,是很常见的统计需求,这里提供通用的计算方法,不同的业务需求只需要小改即可拿来即用。


需求


利用Flink实时统计,从0点到当前的pv、uv。


一、需求分析


Kafka发送过来的数据含有:时间戳时间维度用户id,需要从不同维度统计从0点到当前时间的pvuv,第二天0点重新开始计数第二天的。


二、技术方案


  • Kafka数据可能会有延迟乱序,这里引入watermark


  • keyBy分流进不同的滚动window,每个窗口内计算pvuv


  • 由于需要保存一天的状态,process里面使用ValueState保存pvuv


  • 使用BitMap类型ValueState,占内存很小,引入支持bitmap的依赖;


  • 保存状态需要设置ttl过期时间,第二天把第一天的过期,避免内存占用过大。


三、数据准备


这里假设是用户订单数据,数据格式如下:

{"time":"2021-10-31 22:00:01","timestamp":"1635228001","product":"苹果手机","uid":255420}
{"time":"2021-10-31 22:00:02","timestamp":"1635228001","product":"MacBook Pro","uid":255421}


四、代码实现


整个工程代码截图如下(抹去了一些不方便公开的信息):

微信图片_20220426180706.png


1. 环境


kafka:1.0.0;


Flink:1.11.0;


2. 发送测试数据


首先发送数据到kafka测试集群,maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>


发送代码:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import jodd.util.ThreadUtil;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.io.*;
public class SendDataToKafka {
    @Test
    public void sendData() throws IOException {
        String inpath = "E:\\我的文件\\click.txt";
        String topic = "click_test";
        int cnt = 0;
        String line;
        InputStream inputStream = new FileInputStream(inpath);
        Reader reader = new InputStreamReader(inputStream);
        LineNumberReader lnr = new LineNumberReader(reader);
        while ((line = lnr.readLine()) != null) {
            // 这里的KafkaUtil是个生产者、消费者工具类,可以自行实现
            KafkaUtil.sendDataToKafka(topic, String.valueOf(cnt), line);
            cnt = cnt + 1;
            ThreadUtil.sleep(100);
        }
    }
}


3. 主要程序


先定义个pojo

@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class UserClickModel {
    private String date;
    private String product;
    private int uid;
    private int pv;
    private int uv;
}


接着就是使用Flink消费kafka,指定Watermark,通过KeyBy分流,进入滚动窗口函数通过状态保存pvuv

public class UserClickMain {
    private static final Map<String, String> config = Configuration.initConfig("commons.xml");
    public static void main(String[] args) throws Exception {
        // 初始化环境,配置相关属性
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        senv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        senv.setStateBackend(new FsStateBackend("hdfs://bigdata/flink/checkpoints/userClick"));
        // 读取kafka
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
        kafkaProps.setProperty("group.id", config.get("kafka-groupid"));
        // kafkaProps.setProperty("auto.offset.reset", "earliest");
        // watrmark 允许数据延迟时间
        long maxOutOfOrderness = 5 * 1000L;
        SingleOutputStreamOperator<UserClickModel> dataStream = senv.addSource(
                new FlinkKafkaConsumer<>(
                        config.get("kafka-topic"),
                        new SimpleStringSchema(),
                        kafkaProps
                ))
                //设置watermark
                .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness))
                        .withTimestampAssigner((element, recordTimestamp) -> {
                            // 时间戳须为毫秒
                            return Long.valueOf(JSON.parseObject(element).getString("timestamp")) * 1000;
                        })).map(new FCClickMapFunction()).returns(TypeInformation.of(new TypeHint<UserClickModel>() {
                }));
        // 按照 (date, product) 分组
        dataStream.keyBy(new KeySelector<UserClickModel, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(UserClickModel value) throws Exception {
                return Tuple2.of(value.getDate(), value.getProduct());
            }
        })
                // 一天为窗口,指定时间起点比时间戳时间早8个小时
                .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
                // 10s触发一次计算,更新统计结果
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
                // 计算pv uv
                .process(new MyProcessWindowFunctionBitMap())
                // 保存结果到mysql
                .addSink(new FCClickSinkFunction());
        senv.execute(UserClickMain.class.getSimpleName());
    }
}


代码都是一些常规代码,但是还是有几点需要注意的。


注意


  1. 设置watermark,flink1.11中使用WatermarkStrategy,老的已经废弃了;


  1. 我的数据里面时间戳是秒,需要乘以1000,flink提取时间字段,必须为毫秒


  1. .window只传入一个参数,表明是滚动窗口,TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))这里指定了窗口的大小为一天,由于中国北京时间是东8区,比国际时间早8个小时,需要引入offset,可以自行进入该方法源码查看英文注释。
Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.


  1. 一天大小的窗口,根据watermark机制一天触发计算一次,显然是不合理的,需要用trigger函数指定触发间隔为10s一次,这样我们的pvuv就是10s更新一次结果。


4. 关键代码,计算uv


由于这里用户id刚好是数字,可以使用bitmap去重,简单原理是:把 user_id 作为 bit 的偏移量 offset,设置为 1 表示有访问,使用 1 MB的空间就可以存放 800 多万用户的一天访问计数情况


redis是自带bit数据结构的,不过为了尽量少依赖外部存储媒介,这里自己实现bit,引入相应maven依赖即可:

<dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>0.8.0</version>
</dependency>


计算pv、uv的代码其实都是通用的,可以根据自己的实际业务情况快速修改的:

public class MyProcessWindowFunctionBitMap extends ProcessWindowFunction<UserClickModel, UserClickModel, Tuple<String, String>, TimeWindow> {
    private transient ValueState<Integer> pvState;
    private transient ValueState<Roaring64NavigableMap> bitMapState;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv", Integer.class);
        ValueStateDescriptor<Roaring64NavigableMap> bitMapStateDescriptor = new ValueStateDescriptor("bitMap"
                , TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {}));
        // 过期状态清除
        StateTtlConfig stateTtlConfig = StateTtlConfig
                .newBuilder(Time.days(1))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        // 开启ttl
        pvStateDescriptor.enableTimeToLive(stateTtlConfig);
        bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
        pvState = this.getRuntimeContext().getState(pvStateDescriptor);
        bitMapState = this.getRuntimeContext().getState(bitMapStateDescriptor);
    }
    @Override
    public void process(Tuple2<String, String> key, Context context, Iterable<UserClickModel> elements, Collector<UserClickModel> out) throws Exception {
        // 当前状态的pv uv
        Integer pv = pvState.value();
        Roaring64NavigableMap bitMap = bitMapState.value();
        if(bitMap == null){
            bitMap = new Roaring64NavigableMap();
            pv = 0;
        }
        Iterator<UserClickModel> iterator = elements.iterator();
        while (iterator.hasNext()){
            pv = pv + 1;
            int uid = iterator.next().getUid();
            //如果userId可以转成long
            bitMap.add(uid);
        }
        // 更新pv
        pvState.update(pv);
        UserClickModel UserClickModel = new UserClickModel();
        UserClickModel.setDate(key.f0);
        UserClickModel.setProduct(key.f1);
        UserClickModel.setPv(pv);
        UserClickModel.setUv(bitMap.getIntCardinality());
        out.collect(UserClickModel);
    }
}


注意


  1. 由于计算uv第二天的时候,就不需要第一天数据了,要及时清理内存中前一天的状态,通过ttl机制过期;

  2. 最终结果保存到mysql里面,如果数据结果分类聚合太多,要注意mysql压力,这块可以自行优化;


五、其它方法


除了使用bitmap去重外,还可以使用Flink SQL,编码更简洁,还可以借助外面的媒介Redis去重:


  1. 基于 set


  1. 基于 bit


  1. 基于 HyperLogLog


  1. 基于bloomfilter


具体思路是,计算pvuv都塞入redis里面,然后再获取值保存统计结果,也是比较常用的。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
62 1
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
43 0
|
7月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之 为多个表指定 SourceFunction 方法和单个 SourceFunction 方法的优缺点是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之使用Dynamic Cumulate Window绘制直播间累计UV曲线如何解决
Flink SQL 在快手实践问题之使用Dynamic Cumulate Window绘制直播间累计UV曲线如何解决
78 1
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之水位线的设置方法是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 网络安全 API
实时计算 Flink版产品使用问题之使用ProcessTime进行窗口计算,并且有4台机器的时间提前了2个小时,会导致什么情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 分布式计算 Hadoop
实时计算 Flink版操作报错合集之使用flink jar开发,报错:找不到main方法,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行DWS层的实时聚合计算时,遇到多次更新同一个字段的情况,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之有哪些方法可以实现整库同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL Java 关系型数据库
实时计算 Flink版操作报错合集之通过flink sql形式同步数据到hudi中,本地启动mian方法报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
222 8