Flink问题之State 0点清除如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:State 0点清除的问题


大家好:       我想问下,在ProcessAllWindowFunction中,在每天的0点清除state如何清除?


参考回答:

如果你需要精确的控制每天 0 点清除 state 的话,或许你可以考虑使用 processFunction[1], 然后自己使用 timer

实现相关逻辑

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/process_function.html


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372163


问题二:flink1.9状态及作业迁移


flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?


参考回答:

Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。

[1] https://issues.apache.org/jira/browse/FLINK-5763


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372165


问题三:Flink SQL处理Array型的JSON


[
{ "id": 1},
{ "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。


参考回答:

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372167


问题四:flink 同时sink hbase和hive,hbase少记录


flink订阅kafka消息,同时sink到hbase和hive中, 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条

query: streamTableEnv.executeSql( """ | |CREATE TABLE hbase_table ( | rowkey VARCHAR, | cf ROW(sex VARCHAR, age INT, created_time VARCHAR) |) WITH ( | 'connector.type' = 'hbase', | 'connector.version' = '2.1.0', | 'connector.table-name' = 'ods:user_hbase6', | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.zookeeper.znode.parent' = '/hbase', | 'connector.write.buffer-flush.max-size' = '1mb', | 'connector.write.buffer-flush.max-rows' = '1', | 'connector.write.buffer-flush.interval' = '0s' |) |""".stripMargin)

val statementSet = streamTableEnv.createStatementSet() val insertHbase = """ |insert into hbase_table |SELECT | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid, | ROW(sex, age, created_time ) as cf |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table) | |""".stripMargin

statementSet.addInsertSql(insertHbase)

val insertHive = """ | |INSERT INTO odsCatalog.ods.hive_table |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH') |FROM kafka_table | |""".stripMargin statementSet.addInsertSql(insertHive)

statementSet.execute()

是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下: Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1

并且,按照官网文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html

设置参数也不识别,报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

看了一下源码, org.apache.flink.table.descriptors.HBaseValidator public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase"; public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0"; public static final String CONNECTOR_TABLE_NAME = "connector.table-name"; public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum"; public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows"; public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval"; 参数还是老参数


参考回答:

(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。

(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1][1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372246


问题五:使用Flink Array Field Type


Flink 1.10.0 问题描述:source表中有个test_array_string ARRAY 字段,在DML语句用test_array_string[0]获取数组中的值会报数组越界异常。另外测试过Array 也是相同错误,Array ,Array 等类型也会报数组越界问题。 请问这是Flink1.10的bug吗?

SQL: CREATETABLE source ( …… test_array_string ARRAY ) WITH ( 'connector.type'='kafka', 'update-mode'='append', 'format.type'='json' …… );

CREATETABLE sink( v_string string ) WITH ( …… );

INSERTINTO sink SELECT test_array_string[0] as v_string from source;

kafka样例数据:{"id":1,"test_array_string":["ff”]}

Flink 执行的时候报以下错误: java.lang.ArrayIndexOutOfBoundsException: 33554432 at org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598) at org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590) at org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534) at org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117) at StreamExecCalc$9.processElement(UnknownSource) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$1.processElement(UnknownSource) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:408) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)


参考回答:

SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where CARDINALITY(arr) >= 5 这种方式防止数组访问越界。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372255

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
SQL Java API
flink问题之state过期设置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
423 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
66 5
|
1月前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
20 0
|
1月前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
59 0
|
1月前
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
44 0
|
3月前
|
消息中间件 应用服务中间件 API
Flink四大基石——3.State
Flink四大基石——3.State
51 1
|
3月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
48 1
|
6月前
|
SQL 分布式数据库 Apache
Flink问题之实现state定时输出如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
6月前
|
存储 消息中间件 资源调度
Flink state 详解
Flink state 详解
69 0
|
存储 关系型数据库 API
Flink State 有可能代替数据库吗?
State 的引入使得实时应用可以不依赖外部数据库来存储元数据及中间数据,部分情况下甚至可以直接用 State 存储结果数据,这让业界不禁思考: State 和 Database 是何种关系?有没有可能用 State 来代替数据库呢?

相关产品

  • 实时计算 Flink版