问题一:State里面用guava Cache
大家好,我想使用一个 ValueState[Cache]的状态,但是发现这个状态的value 没办法更新,
比如我在map里面每次往cache里面put一个字符串,然后update这个state,输出cache的长度,为什么每次输出长度都是1
参考回答:
你好,为什么需要在 State 里面再用 cache 呢?单纯的 State 不能满足需求吗?需求是什么呢?
另外,除了 ValueState,其他的 ListState/MapState 能否满足你的需求呢?
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372121
问题二:如何在Flink SQL中使用周期性水印?
大佬们, 请教下如何在Flink SQL中使用周期性的水印。 我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。
参考回答:
这个问题我理解其实和周期性水印没有关系,是属于 idle source
的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1][1]:
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372110
问题三:flink 1.11 on kubernetes 构建失败
按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
Starting Task Manager sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file system sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file system /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml: Permission denied sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only file system /docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system [ERROR] The execution result is empty. [ERROR] Could not get JVM parameters and dynamic configurations properly.
是否有遇到同样的问题,支个招
参考回答:
你是不是对 /opt/flink/conf 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml 等文件,而这个挂载的目录其实是不可写的。
直接修改configmap里面的内容,这样挂载时候就会自动更新了。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372107
问题四:flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张
场景: canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序; 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢? 例如mysql实例db1中有表test, statusCREATE TABLE test
( id
int(11) NOT NULL, name
varchar(255) NOT NULL, time
datetime NOT NULL, status
int(11) NOT NULL, PRIMARY KEY (id
)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE TABLE status
( status
int(11) NOT NULL, name
varchar(255) NOT NULL, PRIMARY KEY (status
)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE TABLE test (id
INT,name
VARCHAR(255),time
TIMESTAMP(3),status
INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='canal-json', 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (status
INT,name
VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='earliest-offset', 'format'='canal-json', 'canal-json.ignore-parse-errors'='true');
参考回答:
另外,我理解下你的需求是 db1.test 同步到 db2.test, db1.status 同步到 db2.status?
多表的有序同步是指?
我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
into 到 db2.status mysql table就行了。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372093
问题五:flink 1.11 connector jdbc 依赖解析失败
我看下了maven仓库里有的[1], 官网文档里也有下载链接[2],是不是pom里的依赖没有写对?1.11 jdbc connector 的module名从 flink-jdbc 规范到了 flink-connector-jdbc。 [1] https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.0/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.0/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
参考回答:
感谢提醒,我是在https://mvnrepository.com/这个上面搜没搜到对应的包的,不过,module名改成flink-connector-jdbc,可以了,感谢提醒
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372091