问题一:Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决
主要是为了实现解析自定义的schema,sink端好输出到下游。
想请教一个问题:
看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:
1、 如果是不同的stream 任务 的它的server id是不是同一个?
2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题
3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题
*来自志愿者整理的flink邮件归档
参考答案:
See the docs: https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370018?spm=a2c6h.13066369.question.83.33bf585fwBnaSf
问题二:flink sql 连接mysql 无数据输出
我在Idea里用flink-jdbc-connector连接mysql, 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因? flink版本1.11.2
*来自志愿者整理的flink邮件归档
参考答案:
是不是没有加这一行代码,tableEnv.execute("test");
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370017?spm=a2c6h.13066369.question.84.33bf585fAxGR1X
问题三:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问
我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > > public static void main(String[] args) { > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > String lTable = "CREATE TABLE l_table ( " + > " l_a INT, " + > " l_b string, " + > " l_rt AS localtimestamp, " + > " WATERMARK FOR l_rt AS l_rt " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='5', " + > " 'fields.l_a.min'='1', " + > " 'fields.l_a.max'='5', " + > " 'fields.l_b.length'='5' " + > ")"; > bsTableEnv.executeSql(lTable); > > String rTable = "CREATE TABLE r_table ( " + > " r_a INT, " + > " r_b string, " + > " r_pt AS proctime() " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='5', " + > " 'fields.r_a.min'='1', " + > " 'fields.r_a.max'='5', " + > " 'fields.r_b.length'='5' " + > ")"; > bsTableEnv.executeSql(rTable); > > String printTable = "CREATE TABLE print (" + > " l_a INT, " + > " l_b string, " + > " l_rt timestamp(3), " + > " r_a INT, " + > " r_b string, " + > " r_pt timestamp(3) " + > ") WITH ( " + > " 'connector' = 'print' " + > ") "; > > bsTableEnv.executeSql(printTable); > > // 运行成功 >// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > > // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. > Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > > bsTableEnv.executeSql("insert into print select * from " + joinTable); > > } > >}
*来自志愿者整理的flink邮件归档
参考答案:
因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370016?spm=a2c6h.13066369.question.85.33bf585fSSKbxa
问题四:slot问题
一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?
*来自志愿者整理的flink邮件归档
参考答案:
有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370015?spm=a2c6h.13066369.question.84.33bf585f3XoJwI
问题五:flink sql时间戳字段类型转换问题
数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
中的kafka消息,里面user_behavior消息例如
{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下
CREATE TABLE user_log (
user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' 'scan.startup.mode' = 'earliest-offset' );
程序运行会抛错
Caused by: java.time.format.DateTimeParseException: Text '2017-11-26T01:00:00Z' could not be parsed at index 10
我查了一下flink json官方文档 https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard
目前只支持两种格式:SQL 和 ISO-8601
其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',
而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'
确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)
请问:
- 像上述时间格式字段在Flink SQL中应该解析成什么类型?
- 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,
pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string
里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',
'yyyy-MM-dd'T'HH:mm:ss'Z'')?
- TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME
ZONE这两种类型在什么情况下会用到?有例子吗?
谢谢!
*来自志愿者整理的flink邮件归档
参考答案:
你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml
这个容器里面的 ts 数据格式是 SQL 格式的。
- 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。
- 是的
- Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。 'yyyy-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp long 值,或者 'yyyy-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE ,代表session 时区的 timestamp
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370014?spm=a2c6h.13066369.question.87.33bf585fd5RWZf