开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

我有两套环境 用的一个flink,kafka在A集群,HBase在B 集群,我在其他节点安装了一个?

我有两套环境 用的一个flink,kafka在A集群,HBase在B 集群,我在其他节点安装了一个flink,然后用flink将A集群的kafka的数据和B集群的hbase的数据做join,但是在flink中无法同时读到这两个服务的数据 把kafka 的connector 放进去就读不到hbase了,一直报zookeeper断开链接,我的hbase是华为的MRS的环境,添加了kerberos的认证?我flink起的yarn session模式

展开
收起
cuicuicuic 2023-06-11 22:08:00 96 0
4 条回答
写回答
取消 提交回答
  • 可以尝试引入引入kafka connector和hbase的依赖:

    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
       <version>1.7.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
       <version>1.3.1</version>
    </dependency>
    

    或者自己下载以下jar包,放入到外部lib中:

    image.png

    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.CheckpointingMode
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
    
    
    /**
      * flink以kafka为source,以hbase为sink
      */
    object FlinkKafkaHbase {
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE)
        //消费者配置
        val properties = new Properties
        properties.put("bootstrap.servers", "hadoop02:9092,hadoop03:9092,hadoop04:9092")
        properties.put("group.id", "g3")
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        //创建消费者 入参(topic,序列化schema,消费者配置)
        val consumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema(), properties)
        consumer.setStartFromEarliest() //设置从最开始消费
        /**
          * 这里有个问题:为什么设置为从最开始消费,每次重启项目都会消费所有旧消息呢?
          * 在实际开发中,如果重启就会消费所有旧消息,肯定是有问题的。
          */
        //以kafkaConsumer作为source创建流
        val stream: DataStream[String] = env.addSource(consumer).setParallelism(1)//设置并行度为1
        //处理逻辑
        val filterStream = stream.filter(_.contains("student"))
    
        val hbaseSink = new HbaseSink("student","info")
        filterStream.addSink(hbaseSink)
        env.execute("flink_kafka_hbase")
      }
    }
    

    ——参考链接

    2024-01-25 21:11:26
    赞同 1 展开评论 打赏
  • 某政企事业单位运维工程师,主要从事系统运维工作,同时兼顾教育行业、企业等src安全漏洞挖掘,曾获全国行业网络安全大赛二等奖。

    由于你提到的 Zookeeper 断开连接问题,很可能是由于某种原因导致 Zookeeper 未能建立正确的连接或者没有正确配置。

    以下是几个可能导致此问题的原因及相应的解决方案建议:

    • ZooKeeper 不兼容性

    华为 MRS 中的 HBase 可能使用的不是标准的 ZooKeeper 版本,而是经过特殊调整后的版本。

    解决方案:检查 ZooKeeper 版本是否符合要求,如有必要请升级或替换 ZooKeeper 版本。

    • Kerberos 认证未正确配置

    当前的 Kerberos 配置可能不足以支持同时连接到 Kafka 和 HBase。

    解决方案:重新审查和校验 Kerberos 配置,确保已包含所有必要的权限和凭据。

    • Flink Session 模式问题

    Flink Session 模式的配置可能阻止了同时读取 Kafka 和 HBase 的能力。

    解决方案:查看 Flink Session 模型的相关文档,并确保配置允许同时读写多个数据源和服务。

    • JVM 内存不足

    JVM 内存不足也可能导致连接器无法正确加载和运行。

    解决方案:增大 Java heap size (Xmx), Xms, etc., to accommodate both connectors at once.

    • 网络问题

    网络问题也有可能阻碍了 Kafka 和 HBase 的连接。

    解决方案:检查网络连通性,包括防火墙设置、路由表、DNS解析等问题。

    综上所述,你应该逐一排除这些可能性,直到找到真正的原因为止。另外,强烈推荐先在单机版的 Flink 测试环境中模拟同样的作业,以帮助定位问题所在。一旦找到了问题的根本原因,就可以着手寻找合适的解决方案。

    2024-01-19 15:38:53
    赞同 展开评论 打赏
  • 在您的场景中,Flink试图同时从两个不同的集群(Kafka集群和HBase集群,后者位于华为MRS环境中并启用了Kerberos认证)读取数据,并在YARN session模式下执行join操作。遇到Zookeeper连接断开的问题,可能是以下几个原因造成的:

    1. 网络配置:
      确保运行Flink任务的节点能同时与Kafka集群和HBase集群的Zookeeper服务保持稳定的网络连接。如果网络不通或防火墙规则阻挡了必要的通信,则可能导致连接断开。

    2. HBase与ZooKeeper连接参数:
      检查Flink HBase connector配置,确保指定了正确的Zookeeper地址列表和端口。由于HBase是通过Zookeeper来进行元数据管理,所以这里的连接参数需要正确反映华为MRS环境中的Zookeeper服务位置。

    3. Kerberos认证:
      对于启用了Kerberos的HBase集群,确保在Flink的HBase connector配置中包含了必要的Kerberos认证信息,例如keytab文件路径、principal以及相关Kerberos配置。同时,需要确保这些认证信息能在Flink运行的节点上有效加载。

    4. 资源隔离:
      在YARN session模式下,Flink jobmanager和taskmanager容器可能受限于YARN的资源隔离,如果资源分配不足,可能会造成服务不稳定,间接导致连接断开。检查YARN container的资源分配,确保它们足以支撑Flink任务及其依赖的服务连接。

    5. HBase和Zookeeper的安全策略:
      华为MRS环境可能会有特定的安全策略,比如白名单、IP限制等,需要确认运行Flink任务的节点是否被正确授权访问HBase和Zookeeper服务。

    解决此类问题时,可以按照以下步骤进行排查:

    • 检查并验证Flink任务的HBase和Kafka连接器配置。
    • 查看Flink任务的运行日志,寻找有关Zookeeper连接失败的详细错误信息。
    • 检查运行节点到Zookeeper和HBase集群的网络状况和安全策略。
    • 确保在运行Flink任务的节点上成功完成了Kerberos身份验证流程。
    • 根据需要调整YARN容器的资源配额。
    2024-01-15 15:13:32
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    可能遇到了一些网络连接问题,导致Flink无法同时读取Kafka和HBase的数据。以下是一些建议,希望能帮助你解决问题:

    1. 检查Kafka和HBase的连接地址是否正确。确保Flink能够正确连接到Kafka和HBase集群,并且可以读取数据。你可以在Flink的配置文件中检查这些地址,并确保它们指向正确的集群和节点。
    1. 检查Zookeeper是否正常运行。如果你使用的是Kerberos认证,那么Zookeeper是必需的。请确保Zookeeper在A集群和B集群上都正常运行,并且可以访问。你可以在Flink的配置文件中检查Zookeeper的地址,并确保它指向正确的Zookeeper集群。
    1. 检查网络连接是否正常。如果你在不同的集群上运行Flink,那么网络连接可能会出现问题。请确保Flink能够访问Kafka和HBase集群,并且可以在它们之间传输数据。你可以在Flink的日志中查找有关网络连接问题的警告或错误消息。
    1. 尝试使用更详细的日志记录。你可以在Flink的配置文件中启用更详细的日志记录,以便更好地了解Flink在运行时遇到的问题。你可以在Flink的文档中找到有关如何配置日志记录的说明。
    2024-01-12 21:47:36
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载