Flink构建问题之flink 1.11 on kubernetes构建失败如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 on kubernetes 构建失败

按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。

目前看差别在于1.11启动jm和tm是通过args:

["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed

本地挂载的flink-configuration-configmap.yaml导致失败。

1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。

command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\

while :;

do

if [[ -f $(find log -name 'jobmanager.log' -print -quit) ]];

then tail -f -n +1 log/jobmanager.log;

fi;

done"]

如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。 1.11 版本的部署方式如果有大佬可以走通的,求分享。

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions

[3]

https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh

*来自志愿者整理的flink邮件归档



参考答案:

sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改

才会这样[1]

你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。

如果不是,需要你把你的yaml发出来

[1].

https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

[2].

https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370317?spm=a2c6h.12873639.article-detail.37.29d04378ApxdqJ



问题二:flinksql1.11中主键声明的问题

我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题

下面我附录了报错信息和代码。谢谢!

报错附录

Traceback (most recent call last):

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco

return f(*a, **kw)

File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value

format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.

: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "mysql_join.py", line 90, in

from_kafka_to_kafka_demo()

File "mysql_join.py", line 22, in from_kafka_to_kafka_demo

st_env.execute("2-from_kafka_to_kafka")

File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute

return JobExecutionResult(self._j_tenv.execute(job_name))

File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call

answer, self.gateway_client, self.target_id, self.name)

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco

raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)

pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that Table has a full primary keys if it is updated.'

代码附录

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic

from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink

from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime

from pyflink.table.window import Tumble

def from_kafka_to_kafka_demo():

use blink table planner

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env_settings = EnvironmentSettings.Builder().use_blink_planner().build()

st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

register source and sink

register_rides_source(st_env)

register_rides_sink(st_env)

register_mysql_source(st_env)

st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")

st_env.execute("2-from_kafka_to_kafka")

def register_rides_source(st_env):

source_ddl = \

"""

create table source1(

id int,

time1 varchar ,

type string

) with (

'connector.type' = 'kafka',

'connector.topic' = 'tp1',

'connector.startup-mode' = 'latest-offset',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'format.type' = 'json',

'connector.version' = 'universal',

'update-mode' = 'append'

)

"""

st_env.sql_update(source_ddl)

def register_mysql_source(st_env):

source_ddl = \

"""

CREATE TABLE dim_mysql (

id int, --

type varchar --

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_test',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.lookup.cache.max-rows' = '5000',

'connector.lookup.cache.ttl' = '1min',

'connector.lookup.max-retries' = '3'

)

"""

st_env.sql_update(source_ddl)

def register_rides_sink(st_env):

sink_ddl = \

"""

CREATE TABLE flink_result (

id int,

type varchar,

rtime bigint,

primary key(id) NOT ENFORCED

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_result',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.write.flush.max-rows' = '5000',

'connector.write.flush.interval' = '2s',

'connector.write.max-retries' = '3'

)

"""

st_env.sql_update(sink_ddl)

if name == 'main':

from_kafka_to_kafka_demo()

*来自志愿者整理的flink邮件归档



参考答案:

你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370315?spm=a2c6h.12873639.article-detail.38.29d04378ApxdqJ



问题三:flink1.11启动问题

Flink1.11启动时报错: java.lang.LinkageError: ClassCastException: attempting to castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class to jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class at javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125) at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97) at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172) at com.sun.jersey.core.header.MediaTypes. (MediaTypes.java:65) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) at com.sun.jersey.api.client.Client.init(Client.java:342) at com.sun.jersey.api.client.Client.access$000(Client.java:118) at com.sun.jersey.api.client.Client$1.f(Client.java:191) at com.sun.jersey.api.client.Client$1.f(Client.java:187) at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) at com.sun.jersey.api.client.Client. (Client.java:187) at com.sun.jersey.api.client.Client. (Client.java:170) at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at com.missfresh.Main.main(Main.java:142) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

我已经在lib下添加了javax.ws.rs-api-2.1.1.jar

*来自志愿者整理的flink邮件归档



参考答案:

这flink1.11啥情况啊,一启动就报 java.lang.LinkageError: ClassCastException: attempting to castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class to jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class

jersey.xxxx.jar javax.ws.xxx.jar我都放到lib了,怎么还是不行啊?这报的什么鬼东西?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370313?spm=a2c6h.12873639.article-detail.39.29d04378ApxdqJ



问题四:关于1.11Flink SQL 全新API设计的一些问题

简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL SDK

下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子

  1. 我们现在有某种特定的Kafka数据格式,1条Kafka数据 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
  2. 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
  3. 调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
  4. 对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的

如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的

*来自志愿者整理的flink邮件归档



参考答案:

这个需求 我们也比较类似:

要获取注册的表信息,自己用stream+table 实现部分逻辑

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370312?spm=a2c6h.12873639.article-detail.40.29d04378ApxdqJ



问题五:关于1.11Flink SQL 全新API设计的一些问题

很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~

我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL SDK进行升级。经过初步调研发现,基于FactoryDynamicTable的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的StreamTableSource#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。

所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)

*来自志愿者整理的flink邮件归档



参考答案:

感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净, 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。

我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370310?spm=a2c6h.12873639.article-detail.41.29d04378ApxdqJ

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
Kubernetes Devops 应用服务中间件
基于 Azure DevOps 与阿里云 ACK 构建企业级 CI/CD 流水线
本文介绍如何结合阿里云 ACK 与 Azure DevOps 搭建自动化部署流程,涵盖集群创建、流水线配置、应用部署与公网暴露,助力企业高效落地云原生 DevOps 实践。
320 0
消息中间件 存储 传感器
211 0
|
9月前
|
存储 Kubernetes 调度
|
9月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
165 3
|
9月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
596 2
|
10月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1105 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
10月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
806 7
Flink Materialized Table:构建流批一体 ETL
|
11月前
|
SQL 监控 关系型数据库
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
本文整理自用友畅捷通数据架构师王龙强在FFA2024上的分享,介绍了公司在Flink上构建实时数仓的经验。内容涵盖业务背景、数仓建设、当前挑战、最佳实践和未来展望。随着数据量增长,公司面临数据库性能瓶颈及实时数据处理需求,通过引入Flink技术逐步解决了数据同步、链路稳定性和表结构差异等问题,并计划在未来进一步优化链路稳定性、探索湖仓一体架构以及结合AI技术推进数据资源高效利用。
777 25
用友畅捷通在Flink上构建实时数仓、挑战与最佳实践
|
12月前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
运维 Kubernetes Docker
利用Docker和Kubernetes构建微服务架构
利用Docker和Kubernetes构建微服务架构

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多