问题一:flink 1.11 on kubernetes 构建失败
按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
目前看差别在于1.11启动jm和tm是通过args:
["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
本地挂载的flink-configuration-configmap.yaml导致失败。
1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name 'jobmanager.log' -print -quit) ]];
then tail -f -n +1 log/jobmanager.log;
fi;
done"]
如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。 1.11 版本的部署方式如果有大佬可以走通的,求分享。
[1]
[2]
[3]
https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
*来自志愿者整理的flink邮件归档
参考答案:
sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
才会这样[1]
你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
如果不是,需要你把你的yaml发出来
[1].
https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
[2].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370317?spm=a2c6h.12873639.article-detail.37.29d04378ApxdqJ
问题二:flinksql1.11中主键声明的问题
我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题
下面我附录了报错信息和代码。谢谢!
报错附录
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.
: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "mysql_join.py", line 90, in
from_kafka_to_kafka_demo()
File "mysql_join.py", line 22, in from_kafka_to_kafka_demo
st_env.execute("2-from_kafka_to_kafka")
File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that Table has a full primary keys if it is updated.'
代码附录
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble
def from_kafka_to_kafka_demo():
use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
register_mysql_source(st_env)
st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
source_ddl = \
"""
create table source1(
id int,
time1 varchar ,
type string
) with (
'connector.type' = 'kafka',
'connector.topic' = 'tp1',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'format.type' = 'json',
'connector.version' = 'universal',
'update-mode' = 'append'
)
"""
st_env.sql_update(source_ddl)
def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id int, --
type varchar --
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_test',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = '***',
'connector.password' = '***',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '1min',
'connector.lookup.max-retries' = '3'
)
"""
st_env.sql_update(source_ddl)
def register_rides_sink(st_env):
sink_ddl = \
"""
CREATE TABLE flink_result (
id int,
type varchar,
rtime bigint,
primary key(id) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_result',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = '***',
'connector.password' = '***',
'connector.write.flush.max-rows' = '5000',
'connector.write.flush.interval' = '2s',
'connector.write.max-retries' = '3'
)
"""
st_env.sql_update(sink_ddl)
if name == 'main':
from_kafka_to_kafka_demo()
*来自志愿者整理的flink邮件归档
参考答案:
你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370315?spm=a2c6h.12873639.article-detail.38.29d04378ApxdqJ
问题三:flink1.11启动问题
Flink1.11启动时报错: java.lang.LinkageError: ClassCastException: attempting to castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class to jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class at javax.ws.rs.ext.RuntimeDelegate.findDelegate(RuntimeDelegate.java:125) at javax.ws.rs.ext.RuntimeDelegate.getInstance(RuntimeDelegate.java:97) at javax.ws.rs.core.MediaType.valueOf(MediaType.java:172) at com.sun.jersey.core.header.MediaTypes. (MediaTypes.java:65) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) at com.sun.jersey.api.client.Client.init(Client.java:342) at com.sun.jersey.api.client.Client.access$000(Client.java:118) at com.sun.jersey.api.client.Client$1.f(Client.java:191) at com.sun.jersey.api.client.Client$1.f(Client.java:187) at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) at com.sun.jersey.api.client.Client. (Client.java:187) at com.sun.jersey.api.client.Client. (Client.java:170) at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:280) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:169) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163) at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:76) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:61) at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:43) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:64) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1812) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) at com.missfresh.Main.main(Main.java:142) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
我已经在lib下添加了javax.ws.rs-api-2.1.1.jar
*来自志愿者整理的flink邮件归档
参考答案:
这flink1.11啥情况啊,一启动就报 java.lang.LinkageError: ClassCastException: attempting to castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class to jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
jersey.xxxx.jar javax.ws.xxx.jar我都放到lib了,怎么还是不行啊?这报的什么鬼东西?
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370313?spm=a2c6h.12873639.article-detail.39.29d04378ApxdqJ
问题四:关于1.11Flink SQL 全新API设计的一些问题
简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL SDK
下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
- 我们现在有某种特定的Kafka数据格式,1条Kafka数据 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
- 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
- 调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
- 对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
*来自志愿者整理的flink邮件归档
参考答案:
这个需求 我们也比较类似:
要获取注册的表信息,自己用stream+table 实现部分逻辑
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370312?spm=a2c6h.12873639.article-detail.40.29d04378ApxdqJ
问题五:关于1.11Flink SQL 全新API设计的一些问题
很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL SDK进行升级。经过初步调研发现,基于Factory
和DynamicTable
的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的StreamTableSource
#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
*来自志愿者整理的flink邮件归档
参考答案:
感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净, 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370310?spm=a2c6h.12873639.article-detail.41.29d04378ApxdqJ