GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~
1. 背景信息
在用户行为分析和圈人场景中,经常需要从亿级甚至几十亿级用户中快速筛选出符合特定标签的指标结果。UV 便是行为分析中最常见的指标,代表访问网页的自然人,可以引申为某段时间内某个指标精确去重后的量。例如大促时,电商商家需要实时计算店铺的实时 UV,并根据 UV 情况及时调整运营策略,从而达成销售目标。
在计算用户 UV 时,由于业务需求不同,计算的维度和数据量也不同,通常来讲,会有以下几点诉求:
- 用户数据量大,每天几亿条,维度多(10+ 以上),需要支持各维度间任意组合查询;
- 查询时间需要更灵活,不仅局限于天、周、月、年等,还需要支持更细粒度实时更新查询;
- 需要对用户数精确去重。
面对上诉高复杂度 UV 计算场景,业界常见的手段包括使用 Apche Kylin 等预计算系统或者 Flink + MySQL 的固定维度组合方案,但也会遇见以下几个痛点:
- 需求维度过多时,会带来存储爆炸,预计算时间长;
- 精确去重需要消耗大量资源,容易 OOM;
- 实时更新难,无法支持更加灵活开放的时间周期处理。
Hologres 是基于分析服务一体化理念(Hybrid Serving & Analytical Processing)设计的实时数仓产品,它采用分布式架构,支持数据实时写入,高并发、低延时的分析处理 PB 级数据,兼容 PostgreSQL 协议,使用最熟悉的工具就能进行开发。
Hologres 与实时计算Flink版有着强大的融合优化,支持 Flink 数据高通量实时写入,写入即可见;支持 Flink SQL 维表关联,以及作为 CDC Source 事件驱动开发;同时,Hologres 也支持 RoaringBitmap,利用其超高算法能力和高效 Bitmap 压缩能力,对用户标签筛选,对去重等场景有着高性能的支持。
在上诉 UV 计算场景中,可以使用实时计算Flink版 + Hologres 方式,并基于 RoaringBitmap,实时对用户标签去重。这样的方式,可以较细粒度的实时得到用户 UV、PV 数据,同时便于根据需求调整最小统计窗口(如最近 5 分钟的 UV),实现类似实时监控的效果,更好的在大屏等 BI 展示。相较于以天、周、月等单位的去重,更适合在活动日期进行更细粒度的统计,并且通过简单的聚合,也可以得到较大时间单位的统计结果。
该方案数据链路简单,可以任意维度灵活计算,只需要一份 Bitmap 存储,也没有存储爆炸的问题,还能保证实时更新,从而实现更实时、开发更灵活、功能更完善的多维分析数仓。
下面将会就 UV 计算场景讲解具体操作步骤。
2. 业务架构图
- 实时计算Flink版实时订阅实时采集的数据,数据源可以来源于日志数据,如 Kafka 等;
- 实时计算Flink版对数据做进一步加工处理;
- Hologres 对实时计算Flink版实时写入的数据实时查询;
- 最终查询的数据对接可视化工具,如 DataV 等,用作大屏展示。
3. 详细业务流程
- 实时计算Flink版将流式数据转化为表与维表进行 JOIN 操作,再转化为流式数据。此举可以利用 Hologres 维表的 insertIfNotExists 特性结合自增字段实现高效的 uid 映射;
- 实时计算Flink版把关联的结果数据按照时间窗口进行处理,根据查询维度使用 RoaringBitmap 进行聚合,并将查询维度以及聚合的 uid 存放在聚合结果表,其中聚合出的 uid 结果放入 Hologres 的 RoaringBitmap 类型的字段中;
- 查询时,与离线方式相似,直接按照查询条件查询聚合结果表,并对其中关键的 RoaringBitmap 字段做 or 运算后并统计基数,即可得出对应用户数;
- 具体数据处理流程如下图所示:
4. 方案最佳实践
4.1 前提条件
- 开通 Hologres 并连接开发工具,示例使用 holoweb,详情见 holoweb 快速入门
- 准备并搭建好 Flink 集群环境,您可以使用阿里云Flink全托管或者开源 Flink
4.2 操作步骤
4.2.1 Hologres 创建相关基础表
1)在 Hologres 创建表 uid_mapping 为 uid 映射表,用于映射 uid 到 32位 int 类型。如果原始 uid 已经是 int32 类型,此步骤可忽略。
- 常见的业务系统或者埋点中的用户 ID 很多是字符串类型或 Long 类型,因此需要使用 uid_mapping 类型构建一张映射表。 RoaringBitmap 类型要求用户 ID 必须是 32位 int 类型,而且越稠密越好(即用户 ID 最好连续)。映射表利用 Hologres 的 SERIAL 类型(自增的 32位 int)来实现用户映射的自动管理和稳定映射。
- 由于是实时数据, 在 Hologres 中该表为行存表,以提高 Flink 维表实时 JOIN 的 QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--将uid设为clustering_key和distribution_key便于快速查找其对应的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;
2)创建表 dws_app 为基础聚合表,用于存放在基础维度上聚合后的结果。
- 使用 RoaringBitmap 前需要创建 RoaringBitmap extention,同时也需要 Hologres 实例为 0.10 及以上版本。
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
- 为了更好的性能,建议根据基础聚合表数据量合理的设置 Shard 数,但建议基础聚合表的 Shard 数设置不超过计算资源总 Core 数的 60%。推荐使用以下哨兵表方式,设置 Table Group 的 Shard 数。
--新建shard数为16的Table Group,
--本次测试数据量百万级,总计算资源为128core,设置shard数为16
BEGIN;
CREATE TABLE tg16 (a int); --Table Group哨兵表
CALL set_table_property('tg16', 'shard_count', '16');
COMMIT;
- 相比离线结果表,此结果表增加了时间戳字段,用于实现以 Flink 窗口周期为单位的统计。结果表 DDL 如下:
BEGIN;
CREATE TABLE dws_app(
country text,
prov text,
city text,
ymd text NOT NULL, --日期字段
timetz TIMESTAMPTZ, --统计时间戳,可以实现以Flink窗口周期为单位的统计
uid32_bitmap roaringbitmap, -- 使用roaringbitmap记录uv
PRIMARY KEY (country, prov, city, ymd, timetz)--查询维度和时间作为主键,防止重复插入数据
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段设为clustering_key和event_time_column,便于过滤
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等价于将表放在shard数为16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段设为distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;
4.2.2 实时计算Flink版实时读取数据并更新 dws_app 基础聚合表
在实时计算Flink版中的完整示例源码请见alibabacloud-hologres-connectors examples,下面是在实时计算Flink版中的具体操作步骤:
- Flink 流式读取数据源(DataStream),并转化为源表(Table)。
在 Flink 中使用流式读取数据源,数据源可以是 CSV 文件,也可以来源于 Kafka、Redis 等,可以根据业务场景准备,此次不再具体展开讲述。在 Flink 中转化为源表的代码示例如下:
//此处使用csv文件作为数据源,也可以是kafka/redis等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 与维表join需要添加proctime字段,详见https://help.aliyun.com/document_detail/62506.html
Table odsTable =
tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime());
// 注册到catalog环境
tableEnv.createTemporaryView("odsTable", odsTable);
在 Flink 中将源表与 Hologres 维表(uid_mapping)进行关联。
- 在 Flink 中创建 Hologres 维表,需要使用 insertIfNotExists 参数,即查询不到数据时自行插入,uid_int32 字段便可以利用 Hologres 的 Serial 类型自增创建。
- 将 Flink 源表与 Hologres 维表进行关联(JOIN)
// 创建Hologres维表,其中insertIfNotExists表示查询不到则自行插入
String createUidMappingTable =
String.format(
"create table uid_mapping_dim("
+ " uid string,"
+ " uid_int32 INT"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s'," //Hologres DB名
+ " 'tablename' = '%s',"//Hologres 表名
+ " 'username' = '%s'," //当前账号access id
+ " 'password' = '%s'," //当前账号access key
+ " 'endpoint' = '%s'," //Hologres endpoint
+ " 'insertifnotexists'='true'"
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表join
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);
- 将关联结果转化为 DataStream,通过 Flink 时间窗口处理,结合 RoaringBitmap 进行对指标进行去重处理。
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
source
// 筛选需要统计的维度(country, prov, city, ymd)
.keyBy(0, 1, 2, 3)
// 滚动时间窗口;此处由于使用读取csv模拟输入流,采用ProcessingTime,实际使用中可使用EventTime
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// 触发器,可以在窗口未结束时获取聚合结果
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
.aggregate(
// 聚合函数,根据key By筛选的维度,进行聚合
new AggregateFunction<
Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {
@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}
@Override
public RoaringBitmap add(
Tuple5<String, String, String, String, Integer> in,
RoaringBitmap acc) {
// 将32位的uid添加到RoaringBitmap进行去重
acc.add(in.f4);
return acc;
}
@Override
public RoaringBitmap getResult(RoaringBitmap acc) {
return acc;
}
@Override
public RoaringBitmap merge(
RoaringBitmap acc1, RoaringBitmap acc2) {
return RoaringBitmap.or(acc1, acc2);
}
},
//窗口函数,输出聚合结果
new WindowFunction<
RoaringBitmap,
Tuple6<String, String, String, String, Timestamp, byte[]>,
Tuple,
TimeWindow>() {
@Override
public void apply(
Tuple keys,
TimeWindow timeWindow,
Iterable<RoaringBitmap> iterable,
Collector<
Tuple6<String, String, String, String, Timestamp, byte[]>> out)
throws Exception {
RoaringBitmap result = iterable.iterator().next();
// 优化RoaringBitmap
result.runOptimize();
// 将RoaringBitmap转化为字节数组以存入Holo中
byte[] byteArray = new byte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计,以秒为单位
out.collect(
new Tuple6<>(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
new Timestamp(
timeWindow.getEnd() / 1000 * 1000),
byteArray));
}
});
- 写入 Hologres 结果表。
经过 Flink 去重处理的数据写入至 Hologres 结果表 dws_app,但需要注意的是,Hologres 中 RoaringBitmap 类型在 Flink 中对应 Byte 数组类型。Flink 中代码如下:
// 计算结果转换为表
Table resTable =
tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("timest"),
$("uid32_bitmap"));
// 创建Hologres结果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入
String createHologresTable =
String.format(
"create table sink("
+ " country string,"
+ " prov string,"
+ " city string,"
+ " ymd string,"
+ " timetz timestamp,"
+ " uid32_bitmap BYTES"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'connectionSize' = '%s',"
+ " 'mutatetype' = 'insertOrReplace'"
+ ")",
database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);
4.2.3 数据查询
在 Hologres 中对结果表(dws_app)进行 UV 计算。按照查询维度做聚合计算,查询 Bitmap 基数,得出 Group by 条件下的用户数
- 示例查询 1:查询某天内各个城市的 uv。
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好,此步骤可选
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,city
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd = '20210329'
GROUP BY country
,prov
,city
;
- 示例查询 2:查询某段时间内各个省份的 UV、PV。
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好,此步骤可选
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
,SUM(1) AS pv
FROM dws_app
WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country
,prov
;
4.2.4 可视化展示
计算出 UV、PV 和大多数情况需要用 BI 工具以更直观的方式可视化展示,由于需要使用 RB_CARDINALITY 和 RB_OR_AGG 进行聚合计算,需要使用 BI 的自定义聚合函数的能力,常见的具备该能力的 BI 包括 Apache Superset 和 Tableau,下面将会讲述这两个 BI 工具的最佳实践。
使用 Apache Superset
Apache Superset 对接 Hologres 的方式,请参考产品手册。在 Superset 中可以直接使用 dws_app 表作为 Dataset 使用
并且在数据集中,创建一个单独 Metrics,名为 UV,表达式如下:
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))。
然后就可以开始探索数据了。
当然也可以创建 Dashborad。
使用 Tableau
Tableau 对接 Hologres 的方式,请参考产品手册。可以使用 Tableau 的直通函数直接实现自定义函数的能力,详细介绍请参照Tableau 的手册。在 Tableau 对接 Hologres 后,可以创建一个计算字段,表达式如下:
RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])。
然后就可以开始探索数据了。
当然也可以创建 Dashborad。
5. 总结
能支撑亿级用户的实时 UV 计算,是充分利用了实时计算Flink版和 Hologres 各自的优秀能力:
- 实时计算Flink版的实时事件驱动能力、丰富的窗口定义能力、可扩展的编程接口,支持了 RoaringBitmap 累加行为的定义;
- Hologres 实时更新能力、实时写入能力,保证了数据的高吞吐写入和生效;
- Hologres 内置 bitmap 数据类型,原生支持高效率的 RoaringBitmap 各类操作函数,保证了计算的高效率。
通过实时计算Flink版 + Hologres 实时更新微批次的统计结果,同时基于 bitmap 丰富的表达能力和极致的压缩效率,实现了存储不膨胀,计算高效率和全实时。让亿级用户的 UV 计算,不再是门槛高、消耗资源的复杂操作。基于 bitmap 数据结构也可以实时还原原始的数据状态,确保了数据的高质量、可回溯。更灵活的数据结构,给了用户更灵活的分析体验,让自助式多维分析成为每个运营同学的必备工具。
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc