问题一:flink 1.12如何使用RateLimiter
在源码中找到 FlinkConnectorRateLimiter 和 GuavaFlinkConnectorRateLimiter kafka相关的类中没有找到这些配置 请问如何在api中使用RateLimiter(不修改源码方式) *来自志愿者整理的flink邮件归档
参考答案:
您好 请问是什么场景呢 ?限速的目的是什么 ?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370057?spm=a2c6h.13066369.question.21.33bf585fVzEeb4
问题二:求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本
根据官方提供的方法,用HADOOP_CLASSPATH=hadoop classpath
集成hadoop成功。 因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用
export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH
然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。
./bin/yarn-session.sh -d -s 4 -nm common-flink -jm 1024m -tm 4096m
./bin/sql-client.sh embedded -e conf/sql-env.yaml
sql-env.yaml
configuration:
execution.target: yarn-session
再将hbase包复制到flink_home/lib这种方式,结果一下就掉到了深深的坑里:
尝试1.ClassNotFoundException: org.apache.hadoop.hbase.client.HTable
尝试2.ClassNotFoundException: org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService
尝试3.ClassNotFoundException: org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$BlockingInterface
尝试4.复制hbase-shaded-client-2.1.0-cdh6.3.0.jar到lib,类冲突整个yarn-session都无法启动容器
尝试5\6\7.同3
尝试8\9.ClassNotFoundException: org.apache.hbase.thirdparty.com.google.protobuf.RpcController
尝试9.ClassNotFoundException: org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup
尝试10.NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex. (ExecutionJobVertex.java:272)
直到尝试9通过复制jar包到lib下都解决了,现在lib包下的hbase依赖包有:
hbase-client-2.1.0-cdh6.3.0.jar
hbase-common-2.1.0-cdh6.3.0.jar
hbase-protocol-2.1.0-cdh6.3.0.jar
hbase-protocol-shaded-2.1.0-cdh6.3.0.jar
hbase-shaded-miscellaneous-2.2.1.jar
hbase-shaded-netty-2.2.1.jar
hbase-shaded-protobuf-2.2.1.jar
直到尝试10时解决方法除了修改源代码,想问还有什么解决方法没有?或者有什么好的方法集成hbase? *来自志愿者整理的flink邮件归档
参考答案:
- 提示“找不到hbase包” 具体的异常栈是什么呢?
- 看你的步骤中也没有加 flink hbase connector jar 到 lib 下,这会导致找不到 hbase table factory
- flink 1.11 版本的时候还没有提供 hbase 2.x connector jar
- flink 1.12 版本支持了 hbase 2.x,理论上也兼容 flink 1.11 集群。
所以你可以试下 download https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2_2.11/1.12.0/flink-sql-connector-hbase-2.2_2.11-1.12.0.jar 这个 jar 到 flink/lib 下(这个 jar 已经 shade 了 hbase jar),然后用 HADOOP_CLASSPATH=hadoop classpath
集成hadoop,应该就能 work。具体可以参考下 1.12 的文档 [1]。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370058?spm=a2c6h.13066369.question.24.33bf585fmrHQLg
问题三:[flink-1.10.2] 异步IO结果DataStream 该如何注册为table??
Flink版本:1.10.2
使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
本地测试的结果是一直重复输出数据。
请问一下DataStream 处理之后,怎么才能注册为 Table。
代码如下:
// 异步redis处理 RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node, aggProcessorArgs);
// 获取异步处理流 DataStream result = AsyncDataStream.orderedWait( dataStream, asyncFunction, 60L, TimeUnit.SECONDS, 100).returns(outRowTypeInfo);
// 注册为临时 table tabEnv.createTemporaryView("test_table", result, outRowFields.stream().collect(Collectors.joining(",")));
// result.print("out_table>>"); Table test_table = tabEnv.sqlQuery("select * from test_table");
// 查询临时table tabEnv.toAppendStream(test_table, Row.class).print("test_table");
--
tili
***************************************来自志愿者整理的flink邮件归档
参考答案:
我看你这不是注册进去了么? 有报什么错么?
最后提交作业执行记得调用 StreamExecutionEnvironment.execute()*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370059?spm=a2c6h.13066369.question.25.33bf585fBYHz4Z
问题四:Re:flink 1.9.2 容器ha部署是jobid 一直是000000000000000000
这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行!*来自志愿者整理的flink邮件归档
参考答案:
https://issues.apache.org/jira/browse/FLINK-19358*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370060?spm=a2c6h.13066369.question.26.33bf585fClM81q
问题五:Flink 1.11.2 on yarn 提交job失败怎么处理?
从flink1.7.2升级到1.11.2,job无法提交代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs **):Container: container_1603495749855_55197_02_000001 on hadoop01=====================================================================================LogType:jobmanager.errLog Upload Time:Wed Dec 09 17:03:38 -0800 2020LogLength:802Log Contents:SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/hadoop/dn/sdd/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/33/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]LogType:jobmanager.logLog Upload Time:Wed Dec 09 17:03:38 -0800 2020LogLength:980Log Contents:2020-12-09 17:03:31,918 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.require.client.cert; Ignoring. 2020-12-09 17:03:31,931 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.keystores.factory.class; Ignoring.2020-12-09 17:03:31,931 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.server.conf; Ignoring.2020-12-09 17:03:31,932 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19140-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.client.conf; Ignoring.LogType:jobmanager.outLog Upload Time:Wed Dec 09 17:03:38 -0800 2020LogLength:2188Log Contents:2020-12-09 17:03:36.375 [main] ERROR o.a.f.r.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V at org.apache.hadoop.yarn.conf.YarnConfiguration.addDeprecatedKeys(YarnConfiguration.java:79) at org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:73) at org.apache.flink.yarn.YarnResourceManager.(YarnResourceManager.java:155) at org.apache.flink.yarn.entrypoint.YarnResourceManagerFactory.createResourceManager(YarnResourceManagerFactory.java:76) at org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61) at org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory.createResourceManager(ActiveResourceManagerFactory.java:58) at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) ... 2 common frames omittedContainer: container_1603495749855_55197_01_000001 on hadoop01=====================================================================================LogType:jobmanager.errLog Upload Time:Wed Dec 09 17:03:37 -0800 2020LogLength:802Log Contents:SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/26/test_job.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/data/hadoop/dn/sdf/yarn/nm/usercache/jacob/appcache/application_1603495749855_55197/filecache/33/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]LogType:jobmanager.logLog Upload Time:Wed Dec 09 17:03:37 -0800 2020LogLength:980Log Contents:2020-12-09 17:03:16,286 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19167-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.require.client.cert; Ignoring.2020-12-09 17:03:16,297 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19167-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.keystores.factory.class; Ignoring.2020-12-09 17:03:16,297 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19167-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.server.conf; Ignoring.2020-12-09 17:03:16,298 WARN org.apache.hadoop.conf.Configuration [] - /run/cloudera-scm-agent/process/19167-yarn-NODEMANAGER/core-site.xml:an attempt to override final parameter: hadoop.ssl.client.conf; Ignoring.LogType:jobmanager.outLog Upload Time:Wed Dec 09 17:03:37 -0800 2020LogLength:2188Log Contents:2020-12-09 17:03:20.766 [main] ERROR o.a.f.r.entrypoint.ClusterEntrypoint - Could not start cluster entrypoint YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V at org.apache.hadoop.yarn.conf.YarnConfiguration.addDeprecatedKeys(YarnConfiguration.java:79) at org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:73) at org.apache.flink.yarn.YarnResourceManager.(YarnResourceManager.java:155) at org.apache.flink.yarn.entrypoint.YarnResourceManagerFactory.createResourceManager(YarnResourceManagerFactory.java:76) at org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61) at org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory.createResourceManager(ActiveResourceManagerFactory.java:58) at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) ... 2 common frames omitted【pom文件】 4.0.0 *** *** 0.0.1-SNAPSHOT *** *** UTF-8 1.8 1.8 1.11.2 2.11 0.10 1.1.0-cdh5.8.3 org.apache.orc orc-core 1.5.4 org.redisson redisson 3.12.4 com.microsoft.sqlserver mssql-jdbc 6.2.2.jre8 org.apache.hive hive-exec ${hive.version} * * ch.qos.logback logback-classic 1.2.3 com.sun.mail javax.mail 1.6.2 org.apache.flink flink-connector-kafka-${kafka.version}${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-java${scala.binary.version} ${flink.version} provided org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} junit junit 4.12 test org.apache.hive hive-jdbc 1.1.0 org.slf4j slf4j-api 1.7.30 ******************** 谢谢!
*来自志愿者整理的flink邮件归档
参考答案:
从flink1.7.2升级到1.11.2,job无法提交 代码、pom文件没有任何修改。在1.7.2 客户端提交没有任何问题,在1.11.2提交job启动job报错,日志如下(./yarn logs ****):
Container: container_1603495749855_55197_02_000001 on hadoop01*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370061?spm=a2c6h.13066369.question.27.33bf585fv132T2