开发者社区> 问答> 正文

flink mysql cdc + hive streaming疑问

我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛

Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[hive_catalog, cdc, team]], fields=[team_id, team_name, create_time, update_time])

我的问题: 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka -> hive streaming? 谢谢! 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?

sql语句如下

CREATE DATABASE IF NOT EXISTS cdc

DROP TABLE IF EXISTS cdc.team

CREATE TABLE team( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, proctime as proctime() ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'team' )

CREATE DATABASE IF NOT EXISTS ods

DROP TABLE IF EXISTS ods.team

CREATE TABLE ods.team ( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, ) PARTITIONED BY ( ts_date STRING, ts_hour STRING, ts_minute STRING, ) STORED AS PARQUET TBLPROPERTIES ( 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' )

INSERT INTO ods.team SELECT team_id, team_name, create_time, update_time, my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), my_date_format(create_time,'HH', 'Asia/Shanghai'), my_date_format(create_time,'mm', 'Asia/Shanghai') FROM cdc.team*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:51:34 1495 0
2 条回答
写回答
取消 提交回答
  • hive3 也会报错,请问楼主解决这个问题了吗

    2023-04-11 14:30:09
    赞同 展开评论 打赏
  • hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。*来自志愿者整理的flink

    2021-12-05 17:57:46
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像