问题一:求助:FLINKSQL1.10实时统计累计UV
您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
参考回答:
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
这个已经在1.11中修复了。
[1] https://issues.apache.org/jira/browse/FLINK-17942
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372089
问题二:flink 1.11 作业执行异常
我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: org.apache.flink.table.api.TableExecution: Failed to execute sql
caused by : java.lang.IlleagalStateException: No ExecutorFactory found to execute the application. at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
参考回答:
你有没有导入blink的planner呢,加入这个试试
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372087
问题三:FlinkKafkaProducer没有写入多个topic的功能
我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?
参考回答:
方案是ok的,因为Kafka 默认支持写入topic不存在时自动创建[1], 这个配置是默认开启的,所以只用实现下自定义KafkaSerializationSchema就可以满足你的需求。[1] https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372083
问题四:flink 1.10.1 入 hive 格式为parquet
可以提供一份flink1.10 入hive格式为parquet的例子吗?
参考回答:
只要你的hive目标表创建为Parquet格式就行了哈,INSERT语句上跟其他类型的表没有区别的
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372079
问题五:作业升级到flink1.11,idea运行失败
作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application. at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
是哪里出问题了呢
参考回答:
尝试加一下这个依赖 groupId: org.apache.flink artifactId: flink-clients_${scala.binary.version}*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372078