我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[hive_catalog, cdc, team]], fields=[team_id, team_name, create_time, update_time])
我的问题: 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka -> hive streaming? 谢谢! 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
sql语句如下
CREATE DATABASE IF NOT EXISTS cdc
DROP TABLE IF EXISTS cdc.team
CREATE TABLE team( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, proctime as proctime() ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'team' )
CREATE DATABASE IF NOT EXISTS ods
DROP TABLE IF EXISTS ods.team
CREATE TABLE ods.team ( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, ) PARTITIONED BY ( ts_date STRING, ts_hour STRING, ts_minute STRING, ) STORED AS PARQUET TBLPROPERTIES ( 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' )
INSERT INTO ods.team SELECT team_id, team_name, create_time, update_time, my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), my_date_format(create_time,'HH', 'Asia/Shanghai'), my_date_format(create_time,'mm', 'Asia/Shanghai') FROM cdc.team*来自志愿者整理的flink
hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。