开发者社区> 问答> 正文

flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert

"请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多, 是目前不支持还是我使用的方法不对呢? 版本:flink 1.11.1

关键的2个sql如下

create table open_and_close_terminal_minute_1 ( request_date varchar ,terminal_no varchar ,logon_time varchar ,logout_time varchar ,insert_time varchar ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED ) with ( 'connector' = 'jd…… 'url' = 'jdbc:mys……se', 'table-name' = 'c……, 'driver' = 'com.m…… 'username' = 'ana…… 'password' = 'ana…… 'sink.buffer-flus…… )

upsert into open_and_close_terminal_minute_1 select request_date ,terminal_no ,logon_time ,logout_time ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date ,cast(terminalNo as varchar) as terminal_no ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time from caslog INNER join itoa_b_terminal_shop for system_time as of caslog.proc_time on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey where errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo

)"*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-03 16:31:16 1039 0
1 条回答
写回答
取消 提交回答
  • 这个版本是支持的。 其中插入语句是 "insert into " 而不是 “update into”?

    *来自志愿者整理的flink邮件归档

    2021-12-06 11:21:21
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载