问题一:flniksql(1.17的版本)写入doris,哪位有过经验?
flniksql(1.17的版本)写入doris,哪位大神有过经验,我现在写doris写不进。想请教一下?
Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:67)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeaderLocal(SimplePacketReader.java:81)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:575)
参考答案:自己写Flink程序,调用Doris的DataStream往Doris写,任务多了就做策略调度,利用Checkpoints做两步提交,https://github.com/apache/doris-flink-connector/issues/253,新出的1.5已经支持多表动态写doris了,可以等3.0 。到时候支持整库同步写到doris和元数据转换
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/577292
问题二:Flink CDC管理内存webui上一直是百分百这个是啥原因导致的?状态后端用的是rocksdb?
Flink CDC管理内存webui上一直是百分百这个是啥原因导致的?状态后端用的是rocksdb?
参考答案:
RocksDB作为状态后端,占据100%,如果程序没问题的话,可以不用管
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/577291
问题三:请教一个Flink问题,如何转变为 DataStream<RowData>?
请教一个Flink问题, 在不使用tabEnv的情况下,DataStream 如何转变为 DataStream?
参考答案:
在不使用Table API的情况下,DataStream可以通过一些转换操作来转变为另一个DataStream。以下是一些常见的转换操作示例:
- map操作:将每个元素应用一个函数进行转换。例如,将字符串转换为大写形式:
DataStream<String> input = ...; // 输入的DataStream DataStream<String> output = input.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } });
- filter操作:根据条件过滤元素。例如,过滤出长度大于5的字符串:
DataStream<String> input = ...; // 输入的DataStream DataStream<String> output = input.filter(new Predicate<String>() { @Override public boolean test(String value) throws Exception { return value.length() > 5; } });
- flatMap操作:将每个元素拆分为多个元素。例如,将一个单词列表拆分为单个字母:
DataStream<List<String>> input = ...; // 输入的DataStream,包含单词列表 DataStream<String> output = input.flatMap(new FlatMapFunction<List<String>, String>() { @Override public void flatMap(List<String> value, Collector<String> out) throws Exception { for (String word : value) { out.collect(word); } } });
- keyBy操作:根据某个键对元素进行分组。例如,按照年龄分组:
DataStream<Person> input = ...; // 输入的DataStream,包含Person对象 DataStream<Tuple2<Integer, Person>> output = input.keyBy(0); // 按照年龄分组,假设Person类中第一个字段是年龄
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/577068
问题四:有没有测试过flink拉取kafka的单topic速度极限是多少?
有没有大佬测试过flink拉取kafka的单topic速度极限是多少?我这边测的是20多M每秒,还能再提高吗?我现在是单topic单partition
参考答案:
调一下liner.ms 和 batch.size
不好意思 是consumer fetch.size
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/577067
问题五:flink1.18.0的jdbc包有吗?
flink1.18.0的jdbc包有吗?我看官网有的是flink-connector-jdbc-3.0.0-1.16.jar这个的包,我连接MySQL的时候报错了:[ERROR] Could not execute SQL statement. Reason:
javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)
参考答案:
这个应该是在url后面加个ssl参数就行吧 'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders',
'useSSL' = 'false',
关于本问题的更多回答可点击进行查看: