Flink构建问题之flink 1.11 on kubernetes构建失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 on kubernetes 构建失败

按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。

目前看差别在于1.11启动jm和tm是通过args:

["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed

本地挂载的flink-configuration-configmap.yaml导致失败。

1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。

command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\

while :;

do

if [[ -f $(find log -name 'jobmanager.log' -print -quit) ]];

then tail -f -n +1 log/jobmanager.log;

fi;

done"]

如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。 1.11 版本的部署方式如果有大佬可以走通的,求分享。

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions

[3]

https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh

*来自志愿者整理的flink邮件归档



参考答案:

sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改

才会这样[1]

你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。

如果不是,需要你把你的yaml发出来

[1].

https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

[2].

https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370317?spm=a2c6h.12873639.article-detail.37.29d04378ApxdqJ



问题二:flinksql1.11中主键声明的问题

我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题

下面我附录了报错信息和代码。谢谢!

报错附录

Traceback (most recent call last):

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco

return f(*a, **kw)

File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value

format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.

: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "mysql_join.py", line 90, in

from_kafka_to_kafka_demo()

File "mysql_join.py", line 22, in from_kafka_to_kafka_demo

st_env.execute("2-from_kafka_to_kafka")

File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute

return JobExecutionResult(self._j_tenv.execute(job_name))

File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call

answer, self.gateway_client, self.target_id, self.name)

File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco

raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)

pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that Table has a full primary keys if it is updated.'

代码附录

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic

from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink

from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime

from pyflink.table.window import Tumble

def from_kafka_to_kafka_demo():

use blink table planner

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env_settings = EnvironmentSettings.Builder().use_blink_planner().build()

st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

register source and sink

register_rides_source(st_env)

register_rides_sink(st_env)

register_mysql_source(st_env)

st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")

st_env.execute("2-from_kafka_to_kafka")

def register_rides_source(st_env):

source_ddl = \

"""

create table source1(

id int,

time1 varchar ,

type string

) with (

'connector.type' = 'kafka',

'connector.topic' = 'tp1',

'connector.startup-mode' = 'latest-offset',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'format.type' = 'json',

'connector.version' = 'universal',

'update-mode' = 'append'

)

"""

st_env.sql_update(source_ddl)

def register_mysql_source(st_env):

source_ddl = \

"""

CREATE TABLE dim_mysql (

id int, --

type varchar --

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_test',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.lookup.cache.max-rows' = '5000',

'connector.lookup.cache.ttl' = '1min',

'connector.lookup.max-retries' = '3'

)

"""

st_env.sql_update(source_ddl)

def register_rides_sink(st_env):

sink_ddl = \

"""

CREATE TABLE flink_result (

id int,

type varchar,

rtime bigint,

primary key(id) NOT ENFORCED

) WITH (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://localhost:3390/test',

'connector.table' = 'flink_result',

'connector.driver' = 'com.mysql.cj.jdbc.Driver',

'connector.username' = '***',

'connector.password' = '***',

'connector.write.flush.max-rows' = '5000',

'connector.write.flush.interval' = '2s',

'connector.write.max-retries' = '3'

)

"""

st_env.sql_update(sink_ddl)

if name == 'main':

from_kafka_to_kafka_demo()

*来自志愿者整理的flink邮件归档



参考答案:

你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370315?spm=a2c6h.12873639.article-detail.38.29d04378ApxdqJ



问题三:flink1.11启动问题

Flink1.11启动时报错: java.lang.LinkageError: ClassCastException: attempting to castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class to jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class at javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125) at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97) at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172) at com.sun.jersey.core.header.MediaTypes. (MediaTypes.java:65) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) at com.sun.jersey.api.client.Client.init(Client.java:342) at com.sun.jersey.api.client.Client.access$000(Client.java:118) at com.sun.jersey.api.client.Client$1.f(Client.java:191) at com.sun.jersey.api.client.Client$1.f(Client.java:187) at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) at com.sun.jersey.api.client.Client. (Client.java:187) at com.sun.jersey.api.client.Client. (Client.java:170) at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at com.missfresh.Main.main(Main.java:142) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

我已经在lib下添加了javax.ws.rs-api-2.1.1.jar

*来自志愿者整理的flink邮件归档



参考答案:

这flink1.11啥情况啊,一启动就报 java.lang.LinkageError: ClassCastException: attempting to castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class to jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class

jersey.xxxx.jar javax.ws.xxx.jar我都放到lib了,怎么还是不行啊?这报的什么鬼东西?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370313?spm=a2c6h.12873639.article-detail.39.29d04378ApxdqJ



问题四:关于1.11Flink SQL 全新API设计的一些问题

简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL SDK

下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子

  1. 我们现在有某种特定的Kafka数据格式,1条Kafka数据 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
  2. 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
  3. 调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
  4. 对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的

如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的

*来自志愿者整理的flink邮件归档



参考答案:

这个需求 我们也比较类似:

要获取注册的表信息,自己用stream+table 实现部分逻辑

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370312?spm=a2c6h.12873639.article-detail.40.29d04378ApxdqJ



问题五:关于1.11Flink SQL 全新API设计的一些问题

很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~

我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL SDK进行升级。经过初步调研发现,基于FactoryDynamicTable的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的StreamTableSource#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。

所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)

*来自志愿者整理的flink邮件归档



参考答案:

感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净, 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。

我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370310?spm=a2c6h.12873639.article-detail.41.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
运维 Kubernetes 监控
构建高效自动化运维体系:基于Docker和Kubernetes的实践指南
【2月更文挑战第30天】 在当今快速发展的云计算时代,传统的IT运维模式已难以满足业务的敏捷性和稳定性需求。本文深入探讨了如何通过Docker容器化技术和Kubernetes集群管理工具构建一个高效、可靠的自动化运维体系。文章首先概述了容器化技术和微服务架构的基本概念,随后详细阐述了基于Docker的应用打包、部署流程,以及Kubernetes在自动化部署、扩展和管理容器化应用中的关键作用。最后,文中通过案例分析,展示了如何在实际场景中利用这些技术优化运维流程,提高系统的整体效率和可靠性。
|
30天前
|
Kubernetes 流计算 Perl
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
34 7
|
2天前
|
存储 运维 Kubernetes
构建高效自动化运维体系:Ansible与Kubernetes的协同策略
【4月更文挑战第25天】 在当今快速迭代的软件开发过程中,自动化运维已成为提升效率、保证一致性和降低人为错误的关键。本文将探讨如何利用Ansible作为配置管理工具,以及Kubernetes作为容器编排系统,共同构建一个高效、可靠的自动化运维体系。文章首先概述了自动化运维的基本概念及其重要性,随后详细分析了Ansible与Kubernetes在自动化流程中的作用与优势,并通过一系列实践案例,展示了两者如何协同工作以优化部署、扩缩容和灾难恢复等关键运维任务。最后,文中还讨论了在实际应用中可能遇到的挑战及相应的解决策略,为读者提供了一套完整的自动化运维解决方案参考。
|
13天前
|
Kubernetes 监控 Cloud Native
构建高效云原生应用:基于Kubernetes的微服务治理实践
【4月更文挑战第13天】 在当今数字化转型的浪潮中,企业纷纷将目光投向了云原生技术以支持其业务敏捷性和可扩展性。本文深入探讨了利用Kubernetes作为容器编排平台,实现微服务架构的有效治理,旨在为开发者和运维团队提供一套优化策略,以确保云原生应用的高性能和稳定性。通过分析微服务设计原则、Kubernetes的核心组件以及实际案例,本文揭示了在多变的业务需求下,如何确保系统的高可用性、弹性和安全性。
17 4
|
28天前
|
运维 Kubernetes 持续交付
构建高效自动化运维体系:基于Docker和Kubernetes的最佳实践
在现代云计算环境中,自动化运维成为保障系统稳定性与提升效率的关键。本文深入探讨了如何利用Docker容器化技术和Kubernetes容器编排工具构建一个高效、可靠的自动化运维体系。文中不仅介绍了相关的技术原理,还结合具体案例分析了实施过程中的常见问题及解决方案,为读者提供了一套行之有效的最佳实践指南。
|
1月前
|
Kubernetes 开发者 Docker
构建高效微服务架构:Docker与Kubernetes的完美搭档
【2月更文挑战第29天】在当今快速发展的软件开发领域,微服务架构已成为提高系统可维护性、扩展性和敏捷性的关键解决方案。本文将深入探讨如何利用Docker容器化技术和Kubernetes集群管理工具,共同构建一个既高效又可靠的微服务环境。我们将分析Docker和Kubernetes的核心功能,并展示它们如何协同工作以简化部署流程、增强服务发现机制以及实现无缝的服务伸缩。通过实际案例分析,本文旨在为开发者提供一套实用的微服务架构设计和实施指南。
|
1月前
|
Kubernetes 网络协议 Java
在Kubernetes上运行Flink应用程序时
【2月更文挑战第27天】在Kubernetes上运行Flink应用程序时
37 10
|
1月前
|
SQL 关系型数据库 MySQL
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
145 0
|
2月前
|
SQL 消息中间件 Kubernetes
flink问题之on kubernetes 构建失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
45 1
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5

相关产品

  • 实时计算 Flink版