请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
-- 模拟需求(有点牵强...): -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
-- 1. 创建user_blacklist表 CREATE TABLE user_blacklist
( user_id
bigint(20) NOT NULL, create_time
datetime NOT NULL, PRIMARY KEY (user_id
,create_time
) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO user_blacklist (user_id
, create_time
) VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), (2,'2021-01-04 00:00:00');
-- 2. 模拟kafka数据: -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 00:00:00"} -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 00:00:00"}
-- 操作步骤: 当发送第1条kafka数据得到如下输出: | OP| user_id| event_type | current_ts| bl_count | | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | 当再次发送第1条kafka数据得到如下输出: | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
— SQL 如下:
create table kafka_user_event ( user_id
BIGINT, event_type
STRING, current_ts
timestamp(3), proc_time
AS PROCTIME() ) WITH ( 'connector' = 'kafka', ... );
create table mysql_user_blacklist ( user_id BIGINT, create_time timestamp(3), primary key (user_id,create_time) not enforced ) WITH ( 'connector' = 'jdbc', … );
create view v2_user_event as ( select t1.user_id
, t1.event_type
, t1.current_ts
, count(1) over ( partition by t2.user_id
order by t1.proc_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count from kafka_user_event t1 left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.proc_time
AS t2 on t1.user_id
= t2.user_id
where t1.event_type
= 'LOGIN' );
select * from v2_user_event;*来自志愿者整理的flink邮件归档
这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
table join 了。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。