我有两套环境 用的一个flink,kafka在A集群,HBase在B 集群,我在其他节点安装了一个flink,然后用flink将A集群的kafka的数据和B集群的hbase的数据做join,但是在flink中无法同时读到这两个服务的数据 把kafka 的connector 放进去就读不到hbase了,一直报zookeeper断开链接,我的hbase是华为的MRS的环境,添加了kerberos的认证?我flink起的yarn session模式
可以尝试引入引入kafka connector和hbase的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
或者自己下载以下jar包,放入到外部lib中:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
/**
* flink以kafka为source,以hbase为sink
*/
object FlinkKafkaHbase {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE)
//消费者配置
val properties = new Properties
properties.put("bootstrap.servers", "hadoop02:9092,hadoop03:9092,hadoop04:9092")
properties.put("group.id", "g3")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//创建消费者 入参(topic,序列化schema,消费者配置)
val consumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema(), properties)
consumer.setStartFromEarliest() //设置从最开始消费
/**
* 这里有个问题:为什么设置为从最开始消费,每次重启项目都会消费所有旧消息呢?
* 在实际开发中,如果重启就会消费所有旧消息,肯定是有问题的。
*/
//以kafkaConsumer作为source创建流
val stream: DataStream[String] = env.addSource(consumer).setParallelism(1)//设置并行度为1
//处理逻辑
val filterStream = stream.filter(_.contains("student"))
val hbaseSink = new HbaseSink("student","info")
filterStream.addSink(hbaseSink)
env.execute("flink_kafka_hbase")
}
}
——参考链接。
由于你提到的 Zookeeper 断开连接问题,很可能是由于某种原因导致 Zookeeper 未能建立正确的连接或者没有正确配置。
以下是几个可能导致此问题的原因及相应的解决方案建议:
华为 MRS 中的 HBase 可能使用的不是标准的 ZooKeeper 版本,而是经过特殊调整后的版本。
解决方案:检查 ZooKeeper 版本是否符合要求,如有必要请升级或替换 ZooKeeper 版本。
当前的 Kerberos 配置可能不足以支持同时连接到 Kafka 和 HBase。
解决方案:重新审查和校验 Kerberos 配置,确保已包含所有必要的权限和凭据。
Flink Session 模式的配置可能阻止了同时读取 Kafka 和 HBase 的能力。
解决方案:查看 Flink Session 模型的相关文档,并确保配置允许同时读写多个数据源和服务。
JVM 内存不足也可能导致连接器无法正确加载和运行。
解决方案:增大 Java heap size (Xmx), Xms, etc., to accommodate both connectors at once.
网络问题也有可能阻碍了 Kafka 和 HBase 的连接。
解决方案:检查网络连通性,包括防火墙设置、路由表、DNS解析等问题。
综上所述,你应该逐一排除这些可能性,直到找到真正的原因为止。另外,强烈推荐先在单机版的 Flink 测试环境中模拟同样的作业,以帮助定位问题所在。一旦找到了问题的根本原因,就可以着手寻找合适的解决方案。
在您的场景中,Flink试图同时从两个不同的集群(Kafka集群和HBase集群,后者位于华为MRS环境中并启用了Kerberos认证)读取数据,并在YARN session模式下执行join操作。遇到Zookeeper连接断开的问题,可能是以下几个原因造成的:
网络配置:
确保运行Flink任务的节点能同时与Kafka集群和HBase集群的Zookeeper服务保持稳定的网络连接。如果网络不通或防火墙规则阻挡了必要的通信,则可能导致连接断开。
HBase与ZooKeeper连接参数:
检查Flink HBase connector配置,确保指定了正确的Zookeeper地址列表和端口。由于HBase是通过Zookeeper来进行元数据管理,所以这里的连接参数需要正确反映华为MRS环境中的Zookeeper服务位置。
Kerberos认证:
对于启用了Kerberos的HBase集群,确保在Flink的HBase connector配置中包含了必要的Kerberos认证信息,例如keytab文件路径、principal以及相关Kerberos配置。同时,需要确保这些认证信息能在Flink运行的节点上有效加载。
资源隔离:
在YARN session模式下,Flink jobmanager和taskmanager容器可能受限于YARN的资源隔离,如果资源分配不足,可能会造成服务不稳定,间接导致连接断开。检查YARN container的资源分配,确保它们足以支撑Flink任务及其依赖的服务连接。
HBase和Zookeeper的安全策略:
华为MRS环境可能会有特定的安全策略,比如白名单、IP限制等,需要确认运行Flink任务的节点是否被正确授权访问HBase和Zookeeper服务。
解决此类问题时,可以按照以下步骤进行排查:
可能遇到了一些网络连接问题,导致Flink无法同时读取Kafka和HBase的数据。以下是一些建议,希望能帮助你解决问题:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。