Flink+Hologres亿级用户实时UV精确去重最佳实践

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
简介: Flink+Hologres亿级用户实时UV精确去重最佳实践

UV、PV计算,因为业务需求不同,通常会分为两种场景:

  • 离线计算场景:以T+1为主,计算历史数据
  • 实时计算场景:实时计算日常新增的数据,对用户标签去重

针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合结果表,就能达到亚秒级查询。具体详情可以参见往期文章>>Hologres如何支持超高基数UV计算(基于RoaringBitmap实现)

对于实时计算场景,可以使用Flink+Hologres方式,并基于RoaringBitmap,实时对用户标签去重。这样的方式,可以较细粒度的实时得到用户UV、PV数据,同时便于根据需求调整最小统计窗口(如最近5分钟的UV),实现类似实时监控的效果,更好的在大屏等BI展示。相较于以天、周、月等为单位的去重,更适合在活动日期进行更细粒度的统计,并且通过简单的聚合,也可以得到较大时间单位的统计结果。

主体思想

  1. Flink将流式数据转化为表与维表进行JOIN操作,再转化为流式数据。此举可以利用Hologres维表的insertIfNotExists特性结合自增字段实现高效的uid映射。
  2. Flink把关联的结果数据按照时间窗口进行处理,根据查询维度使用RoaringBitmap进行聚合,并将查询维度以及聚合的uid存放在聚合结果表,其中聚合出的uid结果放入Hologres的RoaringBitmap类型的字段中。
  3. 查询时,与离线方式相似,直接按照查询条件查询聚合结果表,并对其中关键的RoaringBitmap字段做or运算后并统计基数,即可得出对应用户数。
  4. 处理流程如下图所示

0.jpeg


方案最佳实践

1.创建相关基础表

1)创建表uid_mapping为uid映射表,用于映射uid到32位int类型。

  • RoaringBitmap类型要求用户ID必须是32位int类型且越稠密越好(即用户ID最好连续)。常见的业务系统或者埋点中的用户ID很多是字符串类型或Long类型,因此需要使用uid_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型(自增的32位int)来实现用户映射的自动管理和稳定映射。
  • 由于是实时数据, 设置该表为行存表,以提高Flink维表实时JOIN的QPS。
BEGIN;CREATETABLE public.uid_mapping(uid textNOTNULL,uid_int32 serial,PRIMARY KEY (uid));--将uid设为clustering_key和distribution_key便于快速查找其对应的int32值CALL set_table_property('public.uid_mapping','clustering_key','uid');CALL set_table_property('public.uid_mapping','distribution_key','uid');CALL set_table_property('public.uid_mapping','orientation','row');COMMIT;


2)创建表dws_app为基础聚合表,用于存放在基础维度上聚合后的结果。

  • 使用RoaringBitmap前需要创建RoaringBitmap extention,同时也需要Hologres实例为0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
  • 为了更好性能,建议根据基础聚合表数据量合理的设置Shard数,但建议基础聚合表的Shard数设置不超过计算资源的Core数。推荐使用以下方式通过Table Group来设置Shard数
--新建shard数为16的Table Group,--因为测试数据量百万级,其中后端计算资源为100core,设置shard数为16BEGIN;CREATETABLE tg16 (a int);--Table Group哨兵表call set_table_property('tg16','shard_count','16');COMMIT;
  • 相比离线结果表,此结果表增加了时间戳字段,用于实现以Flink窗口周期为单位的统计。结果表DDL如下:
BEGIN;createtable dws_app(  country text,  prov text,  city text,  ymd textNOTNULL,--日期字段  timetz TIMESTAMPTZ,--统计时间戳,可以实现以Flink窗口周期为单位的统计  uid32_bitmap roaringbitmap,-- 使用roaringbitmap记录uv  primary key(country, prov, city, ymd, timetz)--查询维度和时间作为主键,防止重复插入数据);CALL set_table_property('public.dws_app','orientation','column');--日期字段设为clustering_key和event_time_column,便于过滤CALL set_table_property('public.dws_app','clustering_key','ymd');CALL set_table_property('public.dws_app','event_time_column','ymd');--等价于将表放在shard数为16的table groupcall set_table_property('public.dws_app','colocate_with','tg16');--group by字段设为distribution_keyCALL set_table_property('public.dws_app','distribution_key','country,prov,city');COMMIT;

2.Flink实时读取数据并更新dws_app基础聚合表

完整示例源码请见alibabacloud-hologres-connectors examples

1)Flink 流式读取数据源(DataStream),并转化为源表(Table)

//此处使用csv文件作为数据源,也可以是kafka等DataStreamSourceodsStream=env.createInput(csvInput, typeInfo);
// 与维表join需要添加proctime字段,详见https://help.aliyun.com/document_detail/62506.htmlTableodsTable=tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime());
// 注册到catalog环境tableEnv.createTemporaryView("odsTable", odsTable);

2)将源表与Hologres维表(uid_mapping)进行关联

其中维表使用insertIfNotExists参数,即查询不到数据时自行插入,uid_int32字段便可以利用Hologres的serial类型自增创建。

// 创建Hologres维表,其中nsertIfNotExists表示查询不到则自行插入StringcreateUidMappingTable=String.format(
"create table uid_mapping_dim("+"  uid string,"+"  uid_int32 INT"+") with ("+"  'connector'='hologres',"+"  'dbname' = '%s',"//Hologres DB名+"  'tablename' = '%s',"//Hologres 表名+"  'username' = '%s',"//当前账号access id+"  'password' = '%s',"//当前账号access key+"  'endpoint' = '%s',"//Hologres endpoint+"  'insertifnotexists'='true'"+")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表joinStringodsJoinDim="SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"+"  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"+"  ON ods.uid = dim.uid";
TablejoinRes=tableEnv.sqlQuery(odsJoinDim);


3)将关联结果转化为DataStream,通过Flink时间窗口处理,结合RoaringBitmap进行聚合

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>>processedSource=source// 筛选需要统计的维度(country, prov, city, ymd)    .keyBy(0, 1, 2, 3)
// 滚动时间窗口;此处由于使用读取csv模拟输入流,采用ProcessingTime,实际使用中可使用EventTime    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// 触发器,可以在窗口未结束时获取聚合结果    .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
    .aggregate(
// 聚合函数,根据key By筛选的维度,进行聚合newAggregateFunction<Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {
@OverridepublicRoaringBitmapcreateAccumulator() {
returnnewRoaringBitmap();
            }
@OverridepublicRoaringBitmapadd(
Tuple5<String, String, String, String, Integer>in,
RoaringBitmapacc) {
// 将32位的uid添加到RoaringBitmap进行去重acc.add(in.f4);
returnacc;
            }
@OverridepublicRoaringBitmapgetResult(RoaringBitmapacc) {
returnacc;
            }
@OverridepublicRoaringBitmapmerge(
RoaringBitmapacc1, RoaringBitmapacc2) {
returnRoaringBitmap.or(acc1, acc2);
            }
     },
//窗口函数,输出聚合结果newWindowFunction<RoaringBitmap,
Tuple6<String, String, String, String, Timestamp, byte[]>,
Tuple,
TimeWindow>() {
@Overridepublicvoidapply(
Tuplekeys,
TimeWindowtimeWindow,
Iterable<RoaringBitmap>iterable,
Collector<Tuple6<String, String, String, String, Timestamp, byte[]>>out)
throwsException {
RoaringBitmapresult=iterable.iterator().next();
// 优化RoaringBitmapresult.runOptimize();
// 将RoaringBitmap转化为字节数组以存入Holo中byte[] byteArray=newbyte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计,以秒为单位out.collect(
newTuple6<>(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
newTimestamp(
timeWindow.getEnd() /1000*1000),
byteArray));
        }
    });


4)写入结果表

需要注意的是,Hologres中RoaringBitmap类型在Flink中对应Byte数组类型

// 计算结果转换为表TableresTable=tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("timest"),
$("uid32_bitmap"));
// 创建Hologres结果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入StringcreateHologresTable=String.format(
"create table sink("+"  country string,"+"  prov string,"+"  city string,"+"  ymd string,"+"  timetz timestamp,"+"  uid32_bitmap BYTES"+") with ("+"  'connector'='hologres',"+"  'dbname' = '%s',"+"  'tablename' = '%s',"+"  'username' = '%s',"+"  'password' = '%s',"+"  'endpoint' = '%s',"+"  'connectionSize' = '%s',"+"  'mutatetype' = 'insertOrReplace'"+")",
database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表tableEnv.executeSql("insert into sink select * from "+resTable);

3.数据查询

查询时,从基础聚合表(dws_app)中按照查询维度做聚合计算,查询bitmap基数,得出group by条件下的用户数

  • 查询某天内各个城市的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好sethg_experimental_enable_force_three_stage_agg=offSELECTcountry        ,prov        ,city        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) ASuvFROMdws_appWHEREymd='20210329'GROUPBYcountry         ,prov         ,city;


  • 查询某段时间内各个省份的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好sethg_experimental_enable_force_three_stage_agg=offSELECTcountry        ,prov        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) ASuvFROMdws_appWHEREtime>'2021-04-19 18:00:00+08'andtime<'2021-04-19 19:00:00+08'GROUPBYcountry         ,prov;
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
相关文章
|
17天前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
217 1
Flink CDC + Hologres高性能数据同步优化实践
|
1月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
581 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
21天前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
2月前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
468 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
2月前
|
存储 消息中间件 OLAP
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
本次分享由阿里云产品经理骆撷冬(观秋)主讲,主题为“Hologres+Flink企业级实时数仓核心能力”,是2024实时数仓Hologres线上公开课的第三期。课程详细介绍了Hologres与Flink结合搭建的企业级实时数仓的核心能力,包括解决实时数仓分层问题、基于Flink Catalog的Streaming Warehouse实践,并通过典型客户案例展示了其应用效果。
74 10
Hologres+Flink企业级实时数仓核心能力介绍-2024实时数仓Hologres线上公开课03
|
3月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
245 14
|
4月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
4月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
7月前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8370 15
畅捷通基于Flink的实时数仓落地实践
|
7月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时数仓 Hologres