问题一:Flink SQL共享source 问题
目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert 语句输出到同一张mysql表中,按照我的理解,这些insert语句
应该都是共享这个source table的,读取kafka只需要读取一次,但是在运行过程中发现kafka topic有的分区消费的很快有的分区很慢,请问一下可能是什么原因呢?
topic一共是18个分区,任务是18个并行度*来自志愿者整理的flink邮件归档
参考答案:
- 是不是共享了 source,看下 web ui 中的拓扑图就知道了
- 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。
- 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370088?spm=a2c6h.13066369.question.51.33bf585fY6UfVZ
问题二:用代码执行flink sql 报错
错误:
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath
看意思是找到了两个一样的类:DynamicTableSinkFactory
代码如下: package org.apache.flink.examples;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;
public class CDC2ss2 { public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv; EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); String src_sql = "CREATE TABLE userss (\n" + " user_id INT,\n" + " user_nm STRING\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '10.12.5.37',\n" + " 'port' = '3306',\n" + " 'username' = 'dps',\n" + " 'password' = 'dps1234',\n" + " 'database-name' = 'rpt',\n" + " 'table-name' = 'users'\n" + " )"; tEnv.executeSql(src_sql); // 创建表 String sink="CREATE TABLE sink (\n" + " user_id INT,\n" + " user_nm STRING,\n" + " primary key(user_id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" + " 'username' = 'dps',\n" + " 'password' = 'dps1234',\n" + " 'table-name' = 'sink'\n" + " )"; String to_print_sql="insert into sink select user_id ,user_nm from userss"; tEnv.executeSql(sink); tEnv.executeSql(to_print_sql); env.execute(); }
}
详细错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.
Table options are:
'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.
Ambiguous factory classes are:
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 19 more
Process finished with exit code 1*来自志愿者整理的flink邮件归档
参考答案:
你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB
的 Connector? *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370089?spm=a2c6h.13066369.question.52.33bf585fUbmHBN
问题三:Flink CEP 动态加载 pattern
Hi,在项目中常遇到规则变更的情况,我们一般怎么动态加载这些规则?Flink CEP有原生支持动态加载规则的API吗?*来自志愿者整理的flink邮件归档
参考答案:
Hi 你好,
现在Flink CEP还不支持动态加载规则。社区现在有一个JIRA来跟踪这个需求:
https://issues.apache.org/jira/browse/FLINK-7129 https://issues.apache.org/jira/browse/FLINK-7129
您可以关注这个JIRA来获取最新进展。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370090?spm=a2c6h.13066369.question.55.33bf585fYe37VV
问题四:ookeeper更换leader对flink的影响?
rt*来自志愿者整理的flink邮件归档
参考答案:
Hi 你好,
现在Flink CEP还不支持动态加载规则。社区现在有一个JIRA来跟踪这个需求:
https://issues.apache.org/jira/browse/FLINK-7129 https://issues.apache.org/jira/browse/FLINK-7129
您可以关注这个JIRA来获取最新进展。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370091?spm=a2c6h.13066369.question.54.33bf585fJL0Bmf
问题五:Flink SQL使用Tumble窗口函数报错
大家使用Flink SQL的tumble函数时,将结果表转换为流,报如下错误的异常吗 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(Lorg/apache/flink/api/common/functions/AggregateFunction;Lorg/apache/flink/streaming/api/functions/windowing/WindowFunction;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/streaming/api/datastream/SingleOutputStreamOperator; at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:214) *来自志愿者整理的flink邮件归档
参考答案:
你的 Flink 版本是哪个呢。从报错来看你在用 legacy planner,可以使用 blink planner 试试。 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370092?spm=a2c6h.13066369.question.55.33bf585fKV0Tg2