SQL中对时间窗口和PRODUCT_ID进行了Group By聚合操作,PG数据表中的主键须设置为WINDOW_START /WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID +WINDOW_START /WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?
场景:Top3 热门商品
数据样例: ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME 1,34,6005,4,2019-09-01 00:10:00 2,34,6003,1,2019-09-01 00:20:00 3,34,6005,4,2019-09-01 00:30:00 4,34,6006,3,2019-09-01 00:40:00 5,34,6001,6,2019-09-01 00:51:00 6,34,6005,1,2019-09-01 01:11:00
SQL逻辑如下: --source CREATE TABLE ORDER_DATA{ ORDER_ID VARCHAR, USER_ID VARCHAR, PRODUCT_ID VARCHAR, NUM BIGINT, ORDER_TIME TIMESTAMP, WATEERMARK FOR ORDER_TIME AS ORDER_TIME }WITH{ 'connector.type'='kafka', 'connector.version'='0.10', 'connector.topic'='orderData', 'connector.start-mode'='latest-offset', 'connector.properties.zookeeper.connect'='xxxx:2181', 'connector.properties.boostrap.servers'='xxxx:9092', 'connector.properties.group.id'='flink_sql', 'format.type'='csv', 'format.derive-schema'='true' };
--sink CREATE TABLE PRODUCT_RANK{ RANK_ID BIGINT, WINDOW_START TIMESTAMP(3), WINDOW_END TIMESTAMP(3), PRODUCT_ID VARCHAR, TOTAL_NUM BIGINT }WITH{ 'connector.type'='jdbc', 'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8', 'connector.driver'='org.postgresql.Driver', 'connector.table'='product_rank', 'connector.username'='xxxxx', 'connector.password'='xxxx', 'connector.write.flush.max-rows'='1' };
INSERT INTO PRODUCT_RANK SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM FROM( SELECT *, ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY TOTAL_NUM DESC) AS RANK_ID FROM( SELECT TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) AS WINDOW_START, TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) AS WINDOW_END, SUM(NUM) AS TOTAL_NUM. PRODUCT_ID FROM ORDER_DATA GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' hour),PRODUCT_ID ) ) WHERE RANK_ID <=3;
*来自志愿者整理的FLINK邮件归档```js
非常抱歉当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。