问题一:kafka-connect json格式适配问题?
kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储
Flink定义输出表结构:
CREATE TABLE print_table \
(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
输出的数据格式示例:
{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
但是kafka-connect-jdbc的json格式需要schema和payload,示例:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": true,
"name": "user"
},
"payload": {
"id": 1,
"name": "admin"
}
}
请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
当前Flink处理sql:
INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)*来自志愿者整理的flink邮件归档
参考答案:
你需要在 DDL 和 query 上都补上 schema 和 payload:
CREATE TABLE print_table \
(schema
STRING, payload
ROW<total_count BIGINT, username STRING,
update_time TIMESTAMP(6)>) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
INSERT INTO output
SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
update_time) as payload
FROM ...
Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
mysql 不是很方便么?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371520?spm=a2c6h.12873639.article-detail.17.29d04378ApxdqJ
问题二:Blink的Batch模式的并行度问题
Flink 目前的blink table planner batch mode
(读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
那么如何能扩大并行度来优化性能呢?*来自志愿者整理的flink邮件归档
参考答案:
可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371518?spm=a2c6h.12873639.article-detail.18.29d04378ApxdqJ
问题三:Could not find any factory for identifier 'kafka'
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 编译的jar包是jar-with-dependencies的
代码片段: public String ddlSql = String.format("CREATE TABLE %s (\n" + " number BIGINT,\n" + " msg STRING,\n" + " username STRING,\n" + " update_time TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", tableName, topic, servers, group);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql);
报错信息: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 33 more
参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
附上pom依赖: org.apache.flink flink-java ${flink.version} org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} org.apache.flink flink-json ${flink.version} *来自志愿者整理的flink邮件归档
参考答案:
可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1] [1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371517?spm=a2c6h.12873639.article-detail.19.29d04378ApxdqJ
问题四:flink 聚合 job 重启问题
请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。 但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.*来自志愿者整理的flink邮件归档
参考答案:
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371515?spm=a2c6h.12873639.article-detail.20.29d04378ApxdqJ
问题五:Flink CPU利用率低
想问下大佬们 Flink的cpu利用率这么低吗 0.012?*来自志愿者整理的flink邮件归档
参考答案:
Flink CPU利用率的高低主要还是取决于你的任务中的业务逻辑,框架本身的CPU占用是很低的试想一下,如果你的任务是计算非常简单(或则就是sleep),那整个TM的CPU利用率就很低了,约等于框架占用的如果是一个计算很密集的(或者就是死循环),那TM CPU利用率就是取决于你的slot数量了,2个slot就是200%的CPU利用率 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371512?spm=a2c6h.12873639.article-detail.21.29d04378ApxdqJ