问题一:Flink SQL处理Array型的JSON
Kafka中的JSON结构是个Array例子如下。
[
{ "id": 1},
{ "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。
*来自志愿者整理的flink邮件归档
参考答案:
可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370174?spm=a2c6h.12873639.article-detail.93.6f9243783Lv0fl
问题二:flink 同时sink hbase和hive,hbase少记录
flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( | rowkey VARCHAR, | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '2.1.0', | 'connector.table-name' = 'ods:user_hbase6', | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.zookeeper.znode.parent' = '/hbase', | 'connector.write.buffer-flush.max-size' = '1mb', | 'connector.write.buffer-flush.max-rows' = '1', | 'connector.write.buffer-flush.interval' = '0s' |) |""".stripMargin)
val statementSet = streamTableEnv.createStatementSet() val insertHbase = """ |insert into hbase_table |SELECT | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, | ROW(sex, age, created_time ) as cf |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) | |""".stripMargin
statementSet.addInsertSql(insertHbase)
val insertHive = """ | |INSERT INTO odsCatalog.ods.hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin statementSet.addInsertSql(insertHive)
statementSet.execute()
是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
并且,按照官网文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
设置参数也不识别,报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
看了一下源码, org.apache.flink.table.descriptors.HBaseValidator public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; 参数还是老参数
*来自志愿者整理的flink邮件归档
参考答案:
(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。
(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1][1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370173?spm=a2c6h.12873639.article-detail.94.6f9243783Lv0fl
问题三:【Flink Join内存问题】
我看源码里写到JoinedStreams:
也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
那么有什么预防措施呢?
将key值多的一边进行打散?
Right now, the join is being evaluated in memory so you need to ensure that the number
- of elements per key does not get too high. Otherwise the JVM might crash.
*来自志愿者整理的flink邮件归档
参考答案:
egular join会缓存两边流的所有数据,interval join只存一段时间内的,相比当然节省很大的状态存储
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370172?spm=a2c6h.12873639.article-detail.95.6f9243783Lv0fl
问题四:flink 1.11 createTemporaryTable 指定 rowtime 字段报 Fie
使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable 为事件时间,程序包 Field null does not exist 错误,是我用法有问题? 看了下 https://issues.apache.org/jira/browse/FLINK-16160 https://issues.apache.org/jira/browse/FLINK-16160 这个 issue 是解决的这个问题吗?
tableEnv.connect(kafka) .withSchema( new Schema().field("searchTime", DataTypes.TIMESTAMP()).rowtime(rowtime); ) .withFormat( new Json().failOnMissingField(false) ) .createTemporaryTable("tablename");
*来自志愿者整理的flink邮件归档
参考答案:
Btw,TableEnvironment上的 connect API 目前不建议使用,有许多已知的问题和缺失的 feature,建议用
executeSql(ddl) 来替代。
社区计划在 1.12 中系统地重构和修复 connect API 。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370169?spm=a2c6h.12873639.article-detail.96.6f9243783Lv0fl
问题五:单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及s
单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?请问下,有没有大佬做过类似的事情?
另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序?
*来自志愿者整理的flink邮件归档
参考答案:
你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。
datastream.map(new MySQLSinkMapFunction()).addSink(new
FlinkKafkaProducer()).
也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。
另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370168?spm=a2c6h.12873639.article-detail.97.6f9243783Lv0fl