问题一:flink 1.10 kafka collector topic 配置pattern
请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗?
参考回答:
可以关注下:https://issues.apache.org/jira/browse/FLINK-18449
预计1.12会支持。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371672
问题二:kafkaf To mysql 写入问题
请教两个问题 1) 用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样, 改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225 不知道,用Kafka类的话,如何解决 .connect( new Kafka() .version("0.10") .topic("test-input") 2) 对于timestamp类型字段,用JDBCAppendTableSink 把DataStream 写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误 kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的 jdbc参数类型设置为Types.SQL_TIMESTAMP
参考回答:
估计需要使用Flink 1.11。
1.JSON Format有参数控制 [1]
2.是之前的bug,Flink 1.11应该是不会存在了,不确定1.10.1有没有修。
[1]
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371671
问题三:rocksdb的block cache usage应该如何使用
通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是 flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。
我们的作业一个TM的内存设置如下:
taskmanager.memory.process.size: 23000m taskmanager.memory.managed.fraction: 0.4
ui上显示的Flink Managed MEM是8.48G。
通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。
sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host)
如果维度是host,operator_name,每个operator_name维度是22G。
sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host,operator_name)
请问这个指标应该如何使用?
参考回答:
默认Flink启用了rocksDB 的managed memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block cache均是一个,这样你可以根据taskmanager和subtask_index 作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371670
问题四:flink对task分配slot问题
请教一个问题,当前同一个job下的多个task(不在一个算子链)中,都会存在某一个subTask任务过重,这些subTask会分配到同一个slot下吗?
flink在对subTask分配slot时候,会先判断slot当前存在的任务数,磁盘IO之类的吗?
参考回答:
Flink 在进行 slot sharing 的时候,不会考虑当前 slot 的任务数、磁盘 IO 这些,而是会遵循“相同 task 的多个
subtask 不能分配到同一个 slot 中”这样的一个规则。
举个例子:
如果作业中有 A, B 两个 vertex,并发为 2,那就有 A1, A2, B1, B2 这 4 个 subtask。
那么 A1 和 A2 不能放到一个 slot 中,B1 和 B2 不能够放到一个 slot 中。
所以,slot sharing 的结果只能是 (A1, B1), (A2, B2) 或 (A1, B2), (A2, B1) 这两种情况。
通常情况下,A 和 B 之间的负载可能存在较大差异,而 A1 和 A2、B1 和 B2 之间通常不会有太大差异。
因此,slot sharing 的规则使得每个 slot 中都分配了一个 A 和一个 B,各个 slot 之间的负载大体上是均衡的。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371669
问题五:做实时数仓,sql怎么保证分topic区有序
就是我用 flink sql 通过ddl读取和写入kafka怎么设置并行度呢? flink sql 通过ddl写入kafka怎么自定义分区呢?
这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。
参考回答:
sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。
你可以试试流转表,可以做到细粒度的控制。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371668