UV、PV计算,因为业务需求不同,通常会分为两种场景:
- 离线计算场景:以T+1为主,计算历史数据
- 实时计算场景:实时计算日常新增的数据,对用户标签去重
针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合结果表,就能达到亚秒级查询。具体详情可以参见往期文章>>Hologres如何支持超高基数UV计算(基于RoaringBitmap实现)
对于实时计算场景,可以使用Flink+Hologres方式,并基于RoaringBitmap,实时对用户标签去重。这样的方式,可以较细粒度的实时得到用户UV、PV数据,同时便于根据需求调整最小统计窗口(如最近5分钟的UV),实现类似实时监控的效果,更好的在大屏等BI展示。相较于以天、周、月等为单位的去重,更适合在活动日期进行更细粒度的统计,并且通过简单的聚合,也可以得到较大时间单位的统计结果。
主体思想
- Flink将流式数据转化为表与维表进行JOIN操作,再转化为流式数据。此举可以利用Hologres维表的insertIfNotExists特性结合自增字段实现高效的uid映射。
- Flink把关联的结果数据按照时间窗口进行处理,根据查询维度使用RoaringBitmap进行聚合,并将查询维度以及聚合的uid存放在聚合结果表,其中聚合出的uid结果放入Hologres的RoaringBitmap类型的字段中。
- 查询时,与离线方式相似,直接按照查询条件查询聚合结果表,并对其中关键的RoaringBitmap字段做or运算后并统计基数,即可得出对应用户数。
- 处理流程如下图所示
方案最佳实践
1.创建相关基础表
1)创建表uid_mapping为uid映射表,用于映射uid到32位int类型。
- RoaringBitmap类型要求用户ID必须是32位int类型且越稠密越好(即用户ID最好连续)。常见的业务系统或者埋点中的用户ID很多是字符串类型或Long类型,因此需要使用uid_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型(自增的32位int)来实现用户映射的自动管理和稳定映射。
- 由于是实时数据, 设置该表为行存表,以提高Flink维表实时JOIN的QPS。
BEGIN;CREATETABLE public.uid_mapping(uid textNOTNULL,uid_int32 serial,PRIMARY KEY (uid));--将uid设为clustering_key和distribution_key便于快速查找其对应的int32值CALL set_table_property('public.uid_mapping','clustering_key','uid');CALL set_table_property('public.uid_mapping','distribution_key','uid');CALL set_table_property('public.uid_mapping','orientation','row');COMMIT;
2)创建表dws_app为基础聚合表,用于存放在基础维度上聚合后的结果。
- 使用RoaringBitmap前需要创建RoaringBitmap extention,同时也需要Hologres实例为0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
- 为了更好性能,建议根据基础聚合表数据量合理的设置Shard数,但建议基础聚合表的Shard数设置不超过计算资源的Core数。推荐使用以下方式通过Table Group来设置Shard数
--新建shard数为16的Table Group,--因为测试数据量百万级,其中后端计算资源为100core,设置shard数为16BEGIN;CREATETABLE tg16 (a int);--Table Group哨兵表call set_table_property('tg16','shard_count','16');COMMIT;
- 相比离线结果表,此结果表增加了时间戳字段,用于实现以Flink窗口周期为单位的统计。结果表DDL如下:
BEGIN;createtable dws_app( country text, prov text, city text, ymd textNOTNULL,--日期字段 timetz TIMESTAMPTZ,--统计时间戳,可以实现以Flink窗口周期为单位的统计 uid32_bitmap roaringbitmap,-- 使用roaringbitmap记录uv primary key(country, prov, city, ymd, timetz)--查询维度和时间作为主键,防止重复插入数据);CALL set_table_property('public.dws_app','orientation','column');--日期字段设为clustering_key和event_time_column,便于过滤CALL set_table_property('public.dws_app','clustering_key','ymd');CALL set_table_property('public.dws_app','event_time_column','ymd');--等价于将表放在shard数为16的table groupcall set_table_property('public.dws_app','colocate_with','tg16');--group by字段设为distribution_keyCALL set_table_property('public.dws_app','distribution_key','country,prov,city');COMMIT;
2.Flink实时读取数据并更新dws_app基础聚合表
完整示例源码请见alibabacloud-hologres-connectors examples
1)Flink 流式读取数据源(DataStream),并转化为源表(Table)
//此处使用csv文件作为数据源,也可以是kafka等DataStreamSourceodsStream=env.createInput(csvInput, typeInfo); // 与维表join需要添加proctime字段,详见https://help.aliyun.com/document_detail/62506.htmlTableodsTable=tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // 注册到catalog环境tableEnv.createTemporaryView("odsTable", odsTable);
2)将源表与Hologres维表(uid_mapping)进行关联
其中维表使用insertIfNotExists参数,即查询不到数据时自行插入,uid_int32字段便可以利用Hologres的serial类型自增创建。
// 创建Hologres维表,其中nsertIfNotExists表示查询不到则自行插入StringcreateUidMappingTable=String.format( "create table uid_mapping_dim("+" uid string,"+" uid_int32 INT"+") with ("+" 'connector'='hologres',"+" 'dbname' = '%s',"//Hologres DB名+" 'tablename' = '%s',"//Hologres 表名+" 'username' = '%s',"//当前账号access id+" 'password' = '%s',"//当前账号access key+" 'endpoint' = '%s',"//Hologres endpoint+" 'insertifnotexists'='true'"+")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); // 源表与维表joinStringodsJoinDim="SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"+" FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"+" ON ods.uid = dim.uid"; TablejoinRes=tableEnv.sqlQuery(odsJoinDim);
3)将关联结果转化为DataStream,通过Flink时间窗口处理,结合RoaringBitmap进行聚合
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>>processedSource=source// 筛选需要统计的维度(country, prov, city, ymd) .keyBy(0, 1, 2, 3) // 滚动时间窗口;此处由于使用读取csv模拟输入流,采用ProcessingTime,实际使用中可使用EventTime .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 触发器,可以在窗口未结束时获取聚合结果 .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( // 聚合函数,根据key By筛选的维度,进行聚合newAggregateFunction<Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { publicRoaringBitmapcreateAccumulator() { returnnewRoaringBitmap(); } publicRoaringBitmapadd( Tuple5<String, String, String, String, Integer>in, RoaringBitmapacc) { // 将32位的uid添加到RoaringBitmap进行去重acc.add(in.f4); returnacc; } publicRoaringBitmapgetResult(RoaringBitmapacc) { returnacc; } publicRoaringBitmapmerge( RoaringBitmapacc1, RoaringBitmapacc2) { returnRoaringBitmap.or(acc1, acc2); } }, //窗口函数,输出聚合结果newWindowFunction<RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { publicvoidapply( Tuplekeys, TimeWindowtimeWindow, Iterable<RoaringBitmap>iterable, Collector<Tuple6<String, String, String, String, Timestamp, byte[]>>out) throwsException { RoaringBitmapresult=iterable.iterator().next(); // 优化RoaringBitmapresult.runOptimize(); // 将RoaringBitmap转化为字节数组以存入Holo中byte[] byteArray=newbyte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计,以秒为单位out.collect( newTuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), newTimestamp( timeWindow.getEnd() /1000*1000), byteArray)); } });
4)写入结果表
需要注意的是,Hologres中RoaringBitmap类型在Flink中对应Byte数组类型
// 计算结果转换为表TableresTable=tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); // 创建Hologres结果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入StringcreateHologresTable=String.format( "create table sink("+" country string,"+" prov string,"+" city string,"+" ymd string,"+" timetz timestamp,"+" uid32_bitmap BYTES"+") with ("+" 'connector'='hologres',"+" 'dbname' = '%s',"+" 'tablename' = '%s',"+" 'username' = '%s',"+" 'password' = '%s',"+" 'endpoint' = '%s',"+" 'connectionSize' = '%s',"+" 'mutatetype' = 'insertOrReplace'"+")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); // 写入计算结果到dws表tableEnv.executeSql("insert into sink select * from "+resTable);
3.数据查询
查询时,从基础聚合表(dws_app)中按照查询维度做聚合计算,查询bitmap基数,得出group by条件下的用户数
- 查询某天内各个城市的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好sethg_experimental_enable_force_three_stage_agg=offSELECTcountry ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) ASuvFROMdws_appWHEREymd='20210329'GROUPBYcountry ,prov ,city;
- 查询某段时间内各个省份的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好sethg_experimental_enable_force_three_stage_agg=offSELECTcountry ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) ASuvFROMdws_appWHEREtime>'2021-04-19 18:00:00+08'andtime<'2021-04-19 19:00:00+08'GROUPBYcountry ,prov;