开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink获取消费kafka的时候始终获取不到topic列表是啥原因啊?

Flink获取消费kafka的时候始终获取不到topic列表是啥原因啊?Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:283)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 7 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [board_quote_topic].
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:268)
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
... 7 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics 报错信息

展开
收起
三分钟热度的鱼 2024-05-22 20:49:13 476 0
5 条回答
写回答
取消 提交回答
  • 在使用 Apache Flink 消费 Kafka 时,遇到无法获取 topic 列表的问题,报错信息为 Failed to list subscribed topic partitions。根据提供的错误日志,问题的可能原因包括以下几种:

    Kafka Broker 配置问题:
    TimeoutException: Timed out waiting for a node assignment 表示在请求描述主题(describeTopics)时,等待 Kafka broker 分配节点超时。这可能是因为 Kafka broker 没有正确启动或者配置错误。
    检查 Kafka broker 是否在运行并且可以正常连接。
    确认 Kafka broker 的配置文件中 listeners 和 advertised.listeners 设置正确,确保客户端可以访问。
    网络连接问题:
    确保 Flink 集群与 Kafka 集群之间的网络连接正常,没有防火墙或网络策略阻止通信。
    检查 Flink 和 Kafka 的网络配置,确保没有网络延迟或丢包。
    Kafka 客户端配置问题:
    检查 Flink 应用程序中 Kafka 客户端的配置,特别是 Kafka broker 列表 (bootstrap.servers) 是否正确。
    增加超时时间配置,例如 request.timeout.ms 和 session.timeout.ms,以应对可能的网络延迟问题。
    Kafka 主题问题:
    确认主题 board_quote_topic 是否存在且可以从 Kafka broker 获取到。
    使用 Kafka 命令行工具或 Kafka 管理界面检查主题的状态和分区信息。
    权限问题:
    确认 Flink 应用程序的 Kafka 客户端有权限访问 board_quote_topic 主题。
    检查 Kafka 的 ACL 配置,确保 Flink 客户端具有 Describe 权限。

    2024-08-03 16:45:19
    赞同 展开评论 打赏
  • 这个问题通常是由于Flink与Kafka之间的网络连接问题导致的。您需要检查以下几个方面:

    确认Flink作业与Kafka集群是否在同一个VPC内。
    检查Kafka的bootstrap.servers配置是否正确,确保Flink能够连接到Kafka的broker。
    确认Kafka的白名单配置,如果使用的是云消息队列Kafka版,确保Flink的IP在白名单内。
    检查网络是否有代理、端口转发等设置,确保这些设置允许Flink访问Kafka。
    使用Zookeeper命令行工具获取Kafka broker元信息,并验证与Flink的连通性。
    可参考网络连接排查
    image.png

    2024-07-26 14:28:27
    赞同 展开评论 打赏
  • 问题似乎与Kafka broker的配置有关,而不是直接与FlinK SQL任务设置table.exec.state.ttl相关。不过,为了帮助你排查问题,我们可以一起检查一下Kafka broker的配置以及相关的网络和安全设置。

    Kafka Broker配置检查
    网络连接:
    确认Flink应用所在的网络可以访问Kafka broker的地址和端口。
    检查网络策略或防火墙设置,确保没有阻止Flink应用与Kafka broker之间的通信。
    Kafka ACL (Access Control Lists):
    确认Flink应用使用的消费者组或用户具有所需的权限。
    可以使用Kafka的kafka-acls.sh脚本来查看和管理ACL。image.png

    2024-07-26 09:50:16
    赞同 展开评论 打赏
  • 阿里云大降价~

    可能是Kafka Broker配置问题
    确认Kafka broker的配置允许来自Flink应用所在网络的连接,没有因网络策略或防火墙设置而被阻止。
    检查Kafka是否设置了ACL(访问控制列表),确保Flink应用使用的消费者组或用户具有读取所需主题的权限。
    还有检查Topic是否存在: 首先确认Kafka集群中确实存在您所订阅的主题board_quote_topic。可以通过Kafka的命令行工具或是Kafka管理界面来验证。

    2024-07-24 18:54:25
    赞同 展开评论 打赏
  • 可能的原因多台机器都要配置下/etc/hosts的配置映射。

    排查过程如下:
    先看下kafka的进程信息

    ps -ef |grep kafka
    

    然后看下如下配置信息

    /var/run/cloudera-scm-agent/process/405-kafka-KAFKA_BROKER/kafka.properties
    

    这里看到里面的zk使用了映射的名字
    image.png

    因为zk的连接过程是通过映射名称来通信的。

    ——参考链接

    2024-07-23 15:58:51
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载