Flink升级问题之升级1.10.0失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.11 web ui没有DAG

本地linux下单机版安装的,提交flink代码运行后,正常运行,有日志,但是为啥UI上面却不显示数据接收和发送的条数,求大佬解答

*来自志愿者整理的flink邮件归档



参考答案:

这边说的 UI 上不显示数据接受和发送的条数,能否截图发一下,这样大家能更好的理解这个问题。另外 flink 作业有数据输入和处理吗?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370326?spm=a2c6h.12873639.article-detail.32.29d04378ApxdqJ



问题二:flink stream如何为每条数据生成自增主键

flink stream如何为每条数据生成自增主键??时间戳貌似不行,同一时间戳可能会产生多条数据,无法区分数据的现后顺序。

*来自志愿者整理的flink邮件归档



参考答案:

你是希望每条数据有一个 id,这个 id 是随着数据递增的是啊?或许你可以使用 RichMapFunction[1] 来做这个事情,在每次

mapFunction 中把自定的 id 加进去,然后这个 id 还可以保存到 state 中,这样就算作业 failover 了,自增 id

也不会有问题。

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/user_defined_functions.html#rich-functions

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370324?spm=a2c6h.12873639.article-detail.33.29d04378ApxdqJ



问题三:flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示

Caused by: java.nio.file.NoSuchFileException: /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/000009.sst -> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/000009.sst

配置和1.9.2 一样: state.backend: rocksdb state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ state.savepoints.dir: hdfs:///flink/savepoints/wc/ state.backend.incremental: true

代码上都有

env.enableCheckpointing(10000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));

是1.10.0 需要做什么特别配置么?

*来自志愿者整理的flink邮件归档



参考答案:

你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root cause。

[1] https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370322?spm=a2c6h.12873639.article-detail.34.29d04378ApxdqJ



问题四:flink 1.11 使用sql写入hdfs无法自动提交分区

我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢

我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。

public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(10000); bsEnv.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

String sqlSource = "CREATE TABLE source_kafka (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.10',\n" + " 'connector.topic' = 'mytest',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append' )";

tEnv.executeSql(sqlSource);

String sql = "CREATE TABLE fs_table (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING,\n" + " dt STRING," + " h string" + ") PARTITIONED BY (dt,h) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='hdfs://localhost/tmp/',\n" + " 'sink.partition-commit.policy.kind' = 'success-file', " + " 'format'='orc'\n" + ")"; tEnv.executeSql(sql);

String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

tEnv.executeSql(insertSql);

}

*来自志愿者整理的flink邮件归档



参考答案:

默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?

我用同样SQL在我的环境是有的。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370320?spm=a2c6h.12873639.article-detail.35.29d04378ApxdqJ



问题五:flink1.11 sql

flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗

*来自志愿者整理的flink邮件归档



参考答案:

我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题

下面我附录了报错信息和代码。谢谢!

报错附录

Traceback (most recent call last):

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco

return f(*a, **kw)

File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value

format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.

: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "mysql_join.py", line 90, in

from_kafka_to_kafka_demo()

File "mysql_join.py", line 22, in from_kafka_to_kafka_demo

st_env.execute("2-from_kafka_to_kafka")

File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute

return JobExecutionResult(self._j_tenv.execute(job_name))

File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call

answer, self.gateway_client, self.target_id, self.name)

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco

raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)

pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that Table has a full primary keys if it is updated.'

代码附录

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic

from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink

from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime

from pyflink.table.window import Tumble

def from_kafka_to_kafka_demo():

use blink table planner

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env_settings = EnvironmentSettings.Builder().use_blink_planner().build()

st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

register source and sink

register_rides_source(st_env)

register_rides_sink(st_env)

register_mysql_source(st_env)

st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")

st_env.execute("2-from_kafka_to_kafka")

def register_rides_source(st_env):

source_ddl = \

"""

create table source1(

id int,

time1 varchar ,

type string

) with (

'connector.type' = 'kafka',

'connector.topic' = 'tp1',

'connector.startup-mode' = 'latest-offset',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'format.type' = 'json',

'connector.version' = 'universal',

'update-mode' = 'append'

)

"""

st_env.sql_update(source_ddl)

def register_mysql_source(st_env):

source_ddl = \

"""

CREATE TABLE dim_mysql (

id int, --

type varchar --

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_test',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.lookup.cache.max-rows' = '5000',

'connector.lookup.cache.ttl' = '1min',

'connector.lookup.max-retries' = '3'

)

"""

st_env.sql_update(source_ddl)

def register_rides_sink(st_env):

sink_ddl = \

"""

CREATE TABLE flink_result (

id int,

type varchar,

rtime bigint,

primary key(id) NOT ENFORCED

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_result',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.write.flush.max-rows' = '5000',

'connector.write.flush.interval' = '2s',

'connector.write.max-retries' = '3'

)

"""

st_env.sql_update(sink_ddl)

if name == 'main':

from_kafka_to_kafka_demo()

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370318?spm=a2c6h.12873639.article-detail.36.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
关系型数据库 MySQL Apache
Flink CDC产品常见问题之直接升级里面的Debezium版本失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
消息中间件 资源调度 Java
flink问题之1.10升级到1.11 提交到yarn失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
40 0
|
3月前
|
NoSQL 关系型数据库 MongoDB
Flink cdc报错问题之升级2.3.0报错如何解决
Flink CDC报错指的是使用Apache Flink的Change Data Capture(CDC)组件时遇到的错误和异常;本合集将汇总Flink CDC常见的报错情况,并提供相应的诊断和解决方法,帮助用户快速恢复数据处理任务的正常运行。
|
5月前
|
存储 SQL 大数据
流计算迎来代际变革:流式湖仓 Flink + Paimon 加速落地、Flink CDC 重磅升级
2023 年 12 月 9 日,Flink Forward Asia 2023 在北京圆满结束。70+ 演讲议题、30+ 一线大厂技术与实践分享,以及座无虚席的现场,无一不昭示着重回线下的 FFA 的行业号召力。
90265 4
流计算迎来代际变革:流式湖仓 Flink + Paimon 加速落地、Flink CDC 重磅升级
|
5月前
|
流计算
从Flink 6.0.6升级到8.0.1时
从Flink 6.0.6升级到8.0.1时
53 3
|
9月前
|
存储 分布式计算 NoSQL
支持 Flink/Gluten/优雅升级...Celeborn0.3.0 介绍
本文介绍 Celeborn 新发布的 0.3.0 版本的重要 Feature,包括但不限于:支持 Flink,支持 Native Spark(Gluten),快速优雅升级,支持 HDFS 等。
544 0
支持 Flink/Gluten/优雅升级...Celeborn0.3.0 介绍
|
10月前
|
数据挖掘 关系型数据库 MySQL
Flink-Learning 实战营在升级!更多精美好礼等你来!
加入升级版 Flink-Learning 实战营,动手体验真实有趣的实战场景。
576 0
Flink-Learning 实战营在升级!更多精美好礼等你来!
|
11月前
|
SQL Oracle NoSQL
Flink CDC 2.4 正式发布,新增 Vitess 数据源,PostgreSQL 和 SQL Server CDC 连接器支持增量快照,升级 Debezium 版本
Flink CDC 2.4 正式发布,新增 Vitess 数据源,PostgreSQL 和 SQL Server CDC 连接器支持增量快照,升级 Debezium 版本
1082 1
Flink CDC 2.4 正式发布,新增 Vitess 数据源,PostgreSQL 和 SQL Server CDC 连接器支持增量快照,升级 Debezium 版本
|
Apache 流计算
《Apache Flink 流式应用中状态的数据结构定义升级》电子版地址
Apache Flink 流式应用中状态的数据结构定义升级
90 0
《Apache Flink 流式应用中状态的数据结构定义升级》电子版地址
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
598 5

相关产品

  • 实时计算 Flink版