开发者社区> 问答> 正文

Flink sql 维表聚合问题请教

请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~

-- 模拟需求(有点牵强...): -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作

-- 1. 创建user_blacklist表 CREATE TABLE user_blacklist ( user_id bigint(20) NOT NULL, create_time datetime NOT NULL, PRIMARY KEY (user_id,create_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO user_blacklist (user_id, create_time) VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), (2,'2021-01-04 00:00:00');

-- 2. 模拟kafka数据: -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 00:00:00"} -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 00:00:00"}

-- 操作步骤: 当发送第1条kafka数据得到如下输出: | OP| user_id| event_type | current_ts| bl_count | | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | 当再次发送第1条kafka数据得到如下输出: | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |

— SQL 如下:

create table kafka_user_event ( user_id BIGINT, event_type STRING, current_ts timestamp(3), proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', ... );

create table mysql_user_blacklist ( user_id BIGINT, create_time timestamp(3), primary key (user_id,create_time) not enforced ) WITH ( 'connector' = 'jdbc', … );

create view v2_user_event as ( select t1.user_id , t1.event_type , t1.current_ts , count(1) over ( partition by t2.user_id order by t1.proc_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count from kafka_user_event t1 left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.user_id = t2.user_id where t1.event_type = 'LOGIN' );

select * from v2_user_event;*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 16:44:26 658 0
1 条回答
写回答
取消 提交回答
  • 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。

    为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal

    table join 了。*来自志愿者整理的FLINK邮件归档

    2021-12-02 16:58:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载