问题一:flink 1.11 sql作业提交JM报错
我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more
参考回答:
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
- StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
- Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
- 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()
详细可以参考 [1] [2]
对于 “No operators defined in streaming topology.”,如果使用
TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
提交作业,就会出现前面的错误。
对于
“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?
[1]
[2]
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372274
问题二:flink 1.11 local execution oom问题
我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。
Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
参考回答:
这个问题可以看下是否和 releasenote[1] 中 memory configuration
相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看
[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372276
问题三:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat
本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
x.report.bi_report_fence_common_indicators
select
fence_id,
'finishedOrderCnt' as indicator_name,
TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
count(1) as indicator_val
from
(
select
dt,
fence_id,
fence_coordinates_array,
c.driver_location
from
(
select
*
from
(
select
dt,
driver_location,
r1.f1.fence_info as fence_info
from
(
select
o.dt,
o.driver_location,
MD5(r.city_code) as k,
PROCTIME() as proctime
from
(
select
order_no,
dt,
driver_location,
PROCTIME() as proctime
from
x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
where
_type = 'insert'
and event_code = 'arriveAndSettlement'
) o
LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
) o1
LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
) a
where
fence_info is not null
) c
LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
) as b
where
in_fence(fence_coordinates_array, driver_location)
group by
TUMBLE(dt, INTERVAL '5' MINUTE),
fence_id;
其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
_type STRING,
_old_id BIGINT,
id BIGINT,
_old_order_no STRING,
order_no STRING,
_old_event_code STRING,
event_code STRING,
_old_from_state TINYINT,
from_state TINYINT,
_old_to_state TINYINT,
to_state TINYINT,
_old_operator_type TINYINT,
operator_type TINYINT,
_old_passenger_location STRING,
passenger_location STRING,
_old_driver_location STRING,
driver_location STRING,
_old_trans_time STRING,
trans_time STRING,
_old_create_time STRING,
create_time STRING,
_old_update_time STRING,
update_time STRING,
_old_passenger_poi_address STRING,
passenger_poi_address STRING,
_old_passenger_detail_address STRING,
passenger_detail_address STRING,
_old_driver_poi_address STRING,
driver_poi_address STRING,
_old_driver_detail_address STRING,
driver_detail_address STRING,
_old_operator STRING,
operator STRING,
_old_partition_index TINYINT,
partition_index TINYINT,
dt as TO_TIMESTAMP(trans_time),
WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.properties.bootstrap.servers' = '*',
'connector.properties.zookeeper.connect' = '*',
'connector.version' = 'universal',
'format.type' = 'json',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'connector.topic' = 'xxxxx'
)
参考回答:
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372278
问题四:flink sql连接HBase报错
在使用如下语句创建Flink SQL任务,执行查询报错,我想问下,是我遗漏了什么配置项导致flink在“/hbase” node去取元数据,实际集群的hbase配置是在zk的“/hbase-unsecure” node下的
Flink 版本是1.10,hbase的t1表有数据
create table t1 ( rowkey string, f1 ROW ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 't1', 'connector.zookeeper.quorum' = '10.101.236.2:2181,10.101.236.3:2181,10.101.236.4:2181', 'connector.zookeeper.znode.parent' = '/hbase-unsecure', 'connector.write.buffer-flush.max-size' = '10mb', 'connector.write.buffer-flush.max-rows' = '1', 'connector.write.buffer-flush.interval' = '2s' );
参考回答:
这应该是碰到了Hbase connector的bug [1], 用户配置的hbaseconf 相关的参数,如connector.zookeeper.quorum 不会生效,这个 bug 在1.11.0 已经修复,可以升级下版本。 在1.10.0版本上一种 walkwaround 的方式是把把这些参数放在 hbase-site.xml 的配置文件中,然后将把配置文件添加到 HADOOP_CLASSPATH中,这样Flink程序也可以加载到正确的配置。
[1] https://issues.apache.org/jira/browse/FLINK-17968 https://issues.apache.org/jira/browse/FLINK-17968
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372243
问题五:作业从flink1.9.0迁移到1.10.1
LogicalTypeRoot变更后无法从CP恢复:No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY 各位好: 当我把作业从flink1.9.0迁移到1.10.1,且作业中使用了'group by'形式的语法时,会导致无法从cp/sp恢复,
代码:
报错如下:
switched from RUNNING to FAILED.switched from RUNNING to FAILED.java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_d26553d858836b90d15e66f459fbcb50_(2/3) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 moreCaused by: java.io.InvalidObjectException: enum constant ANY does not exist in class org.apache.flink.table.types.logical.LogicalTypeRoot at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1569) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555) at org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot.readSnapshot(BaseRowSerializer.java:306) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:133) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 moreCaused by: java.lang.IllegalArgumentException: No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY at java.lang.Enum.valueOf(Enum.java:238)
参考回答:
问一下,你是指用1.10去恢复 1.9 作业的 savepoint/checkpoint 吗?还是指迁移到 1.10 后,无法从 failover 中恢复? 如果是前者的话,Flink SQL 目前没有保证跨大版本的 state 兼容性。所以当你从 1.9 升级到 1.10 时,作业需要放弃状态重跑。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372241