问题一:flink1.9读取阿里Mq问题
flink1.9读取阿里RocketMQ 如何设置AccessKey,SecretKey 参数
finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();
参考回答:
社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink
你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370131
问题二:table execution-options 能否通过 -yd 生效
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html#execution-options // instantiate table environment TableEnvironment tEnv = ...
// access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000");
请问下,table的这些参数是不是只能在代码里面设置,通过 -yd 传入可否生效呢?
参考回答:
如果你是写代码来使用TableEnvironment的,
你要显示的在代码中塞进TableConfig中:
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.addAll(GlobalConfiguration.loadConfiguration());
CC: @Yang Wang da...@gmail.com
GlobalConfiguration是个internal的类,有没有public
API获取对应的Configuration?
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370130
问题三:flink1.10在通过TableFunction实现行转列时,Row一直是空
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, 那么在eval方法接收到的就是Row[], 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
通过下面的步骤和代码可还原车祸场景: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
代码1:Problem.java package com.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;
/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode2", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable (\n" + " action ARRAY<\n" + " ROW<" + " actionID STRING,\n" + " actionName STRING\n" + " >\n" + " >\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json'\n" + ")"; bsEnv.sqlUpdate(ddlSource);
// Table table = bsEnv.sqlQuery("select action
from actionTable"); Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(action
)) as T(word
)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print("==tb==");
bsEnv.execute("ARRAY tableFunction Problem"); } }
代码2:ExplodeFunction.java package com.flink;
import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row;
import java.util.ArrayList; import java.util.Arrays;
public class ExplodeFunction extends TableFunction {
public void eval(Row[] values) { System.out.println(values.length); if (values.length > 0) { for (Row row : values) { if (row != null) {// 这里debug出来的row总是空 ArrayList list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); list.add(field); }
collector.collect(Row.of(Arrays.toString(list.toArray()))); } } } } }
参考回答:
当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370129
问题四:rocksdb的block cache usage应该如何使用
通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是 flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。
我们的作业一个TM的内存设置如下:
taskmanager.memory.process.size: 23000m taskmanager.memory.managed.fraction: 0.4
ui上显示的Flink Managed MEM是8.48G。
通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。
sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host)
如果维度是host,operator_name,每个operator_name维度是22G。
sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"}) by (host,operator_name)
请问这个指标应该如何使用?
参考回答:
默认Flink启用了rocksDB 的managed memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block cache均是一个,这样你可以根据taskmanager和subtask_index 作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370128
问题五:kafkaf To mysql 写入问题
请教两个问题 1) 用下面的代码消费kafka 发生序列化异常时,会发生JOB反复重试,重启后也是这样, 改用FlinkKafkaConsumer010类的话,有相关的解决方法,参照https://stackoverflow.com/questions/51301549/how-to-handle-exception-while-parsing-json-in-flink/51302225 不知道,用Kafka类的话,如何解决 .connect( new Kafka() .version("0.10") .topic("test-input") 2) 对于timestamp类型字段,用JDBCAppendTableSink 把DataStream 写入到mysql时,会发下面的错误LocalTimeStamp到Timestamp的转型错误 kafka消息是avro格式,字段类型设置为timestamp(3),我是把System.currentTimeMillis()写入到kafka中的 jdbc参数类型设置为Types.SQL_TIMESTAMP thanks
参考回答:
估计需要使用Flink 1.11。
1.JSON Format有参数控制 [1]
2.是之前的bug,Flink 1.11应该是不会存在了,不确定1.10.1有没有修。
[1]
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370127