flink sql问题之连接HBase报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 sql作业提交JM报错


我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more


参考回答:

1.11 对 StreamTableEnvironment.execute()

和 StreamExecutionEnvironment.execute() 的执行方式有所调整,

简单概述为:

  1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
  2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
  3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业

(异步提交作业),不需要再调用 StreamTableEnvironment.execute()

或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

对于 “No operators defined in streaming topology.”,如果使用

TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用

StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()

提交作业,就会出现前面的错误。

对于

“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372274


问题二:flink 1.11 local execution oom问题


我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。

Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...


参考回答:

这个问题可以看下是否和 releasenote[1] 中 memory configuration

相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看

[1]

https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372276


问题三:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat


本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:

insert into

x.report.bi_report_fence_common_indicators

select

fence_id,

'finishedOrderCnt' as indicator_name,

TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,

count(1) as indicator_val

from

(

select

dt,

fence_id,

fence_coordinates_array,

c.driver_location

from

(

select

*

from

(

select

dt,

driver_location,

r1.f1.fence_info as fence_info

from

(

select

o.dt,

o.driver_location,

MD5(r.city_code) as k,

PROCTIME() as proctime

from

(

select

order_no,

dt,

driver_location,

PROCTIME() as proctime

from

x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner

where

_type = 'insert'

and event_code = 'arriveAndSettlement'

) o

LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no

) o1

LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k

) a

where

fence_info is not null

) c

LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE

) as b

where

in_fence(fence_coordinates_array, driver_location)

group by

TUMBLE(dt, INTERVAL '5' MINUTE),

fence_id;

其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:

CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(

_type STRING,

_old_id BIGINT,

id BIGINT,

_old_order_no STRING,

order_no STRING,

_old_event_code STRING,

event_code STRING,

_old_from_state TINYINT,

from_state TINYINT,

_old_to_state TINYINT,

to_state TINYINT,

_old_operator_type TINYINT,

operator_type TINYINT,

_old_passenger_location STRING,

passenger_location STRING,

_old_driver_location STRING,

driver_location STRING,

_old_trans_time STRING,

trans_time STRING,

_old_create_time STRING,

create_time STRING,

_old_update_time STRING,

update_time STRING,

_old_passenger_poi_address STRING,

passenger_poi_address STRING,

_old_passenger_detail_address STRING,

passenger_detail_address STRING,

_old_driver_poi_address STRING,

driver_poi_address STRING,

_old_driver_detail_address STRING,

driver_detail_address STRING,

_old_operator STRING,

operator STRING,

_old_partition_index TINYINT,

partition_index TINYINT,

dt as TO_TIMESTAMP(trans_time),

WATERMARK FOR dt AS dt - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.properties.bootstrap.servers' = '*',

'connector.properties.zookeeper.connect' = '*',

'connector.version' = 'universal',

'format.type' = 'json',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'group-offsets',

'connector.topic' = 'xxxxx'

)


参考回答:

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372278


问题四:flink sql连接HBase报错


在使用如下语句创建Flink SQL任务,执行查询报错,我想问下,是我遗漏了什么配置项导致flink在“/hbase” node去取元数据,实际集群的hbase配置是在zk的“/hbase-unsecure” node下的

Flink 版本是1.10,hbase的t1表有数据

create table t1 ( rowkey string, f1 ROW ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 't1', 'connector.zookeeper.quorum' = '10.101.236.2:2181,10.101.236.3:2181,10.101.236.4:2181', 'connector.zookeeper.znode.parent' = '/hbase-unsecure', 'connector.write.buffer-flush.max-size' = '10mb', 'connector.write.buffer-flush.max-rows' = '1', 'connector.write.buffer-flush.interval' = '2s' );


参考回答:

这应该是碰到了Hbase connector的bug [1], 用户配置的hbaseconf 相关的参数,如connector.zookeeper.quorum 不会生效,这个 bug 在1.11.0 已经修复,可以升级下版本。 在1.10.0版本上一种 walkwaround 的方式是把把这些参数放在 hbase-site.xml 的配置文件中,然后将把配置文件添加到 HADOOP_CLASSPATH中,这样Flink程序也可以加载到正确的配置。

[1] https://issues.apache.org/jira/browse/FLINK-17968 https://issues.apache.org/jira/browse/FLINK-17968


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372243


问题五:作业从flink1.9.0迁移到1.10.1


LogicalTypeRoot变更后无法从CP恢复:No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY 各位好:    当我把作业从flink1.9.0迁移到1.10.1,且作业中使用了'group by'形式的语法时,会导致无法从cp/sp恢复,

代码:

报错如下:

switched from RUNNING to FAILED.switched from RUNNING to FAILED.java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_d26553d858836b90d15e66f459fbcb50_(2/3) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 moreCaused by: java.io.InvalidObjectException: enum constant ANY does not exist in class org.apache.flink.table.types.logical.LogicalTypeRoot at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1569) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555) at org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot.readSnapshot(BaseRowSerializer.java:306) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:133) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 moreCaused by: java.lang.IllegalArgumentException: No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY at java.lang.Enum.valueOf(Enum.java:238)


参考回答:

问一下,你是指用1.10去恢复 1.9 作业的 savepoint/checkpoint 吗?还是指迁移到 1.10 后,无法从 failover 中恢复? 如果是前者的话,Flink SQL 目前没有保证跨大版本的 state 兼容性。所以当你从 1.9 升级到 1.10 时,作业需要放弃状态重跑。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372241

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 关系型数据库 MySQL
这样的SQL执行为什么不会报错?optimizer_trace深度历险
【10月更文挑战第12天】本文探讨了一条看似错误但实际上能成功执行的SQL语句,通过开启MySQL的优化器追踪功能,详细分析了SQL的执行过程,揭示了子查询被优化器解析为连接操作的原因,最终解释了为何该SQL不会报错。文章不仅增进了对SQL优化机制的理解,也展示了如何利用优化器追踪解决实际问题。
|
4月前
|
SQL 数据库
SQL解析相关报错
SQL解析相关报错
54 5
|
1月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
130 14
|
3月前
|
SQL 关系型数据库 MySQL
|
4月前
|
关系型数据库 MySQL Nacos
nacos启动报错 load derby-schema.sql error
这篇文章描述了作者在使用Nacos时遇到的启动错误,错误提示为加载derby-schema.sql失败,作者通过将数据库从Derby更换为MySQL解决了问题。
nacos启动报错 load derby-schema.sql error
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
71 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
163 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
6月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
87 6

相关产品

  • 实时计算 Flink版