Flink1.12写kafka报错,Caused by: org.apache.kafka.common.errors.TimeoutException: Topic tag_change_stream not present in metadata after 60000 ms.
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据报错信息 org.apache.kafka.common.errors.TimeoutException: Topic tag_change_stream not present in metadata after 60000 ms
,可以判断 Flink 在尝试写入 Kafka 时,未能在指定时间内(60秒)获取到目标 Topic 的元数据。这通常是因为目标 Topic 不存在或 Kafka 集群的网络连接存在问题。
结合知识库中的相关资料,该问题可能由以下原因导致: 1. 目标 Topic 未提前创建:Kafka 默认不会自动创建 Topic,如果目标 Topic 不存在,Flink 无法获取其元数据。 2. 网络连通性问题:Flink 和 Kafka 之间的网络可能存在延迟或配置错误,导致无法正确获取 Topic 元数据。 3. Kafka 配置问题:Kafka Broker 的配置可能导致元数据获取超时。
检查 Topic 是否已创建
使用 Kafka 提供的命令行工具检查目标 Topic 是否存在:
kafka-topics.sh --list --bootstrap-server <KAFKA_BROKER_ADDRESS>
如果 tag_change_stream
未出现在列表中,则需要手动创建该 Topic:
kafka-topics.sh --create --topic tag_change_stream --partitions <PARTITION_COUNT> --replication-factor <REPLICATION_FACTOR> --bootstrap-server <KAFKA_BROKER_ADDRESS>
注意:<PARTITION_COUNT>
和 <REPLICATION_FACTOR>
应根据实际需求设置。
启用自动创建 Topic(可选)
如果希望 Kafka 自动创建 Topic,可以在 Kafka Broker 配置文件中设置:
auto.create.topics.enable=true
警告:启用自动创建 Topic 可能导致意外的 Topic 创建,建议仅在测试环境中使用。
验证网络连通性
确保 Flink 能够通过 Kafka Broker 的地址访问 Kafka 集群。可以通过以下方式验证:
ping
或 telnet
测试 Kafka Broker 的可达性:telnet <KAFKA_BROKER_ADDRESS> <KAFKA_PORT>
检查 Kafka Endpoint 配置
根据知识库资料,Kafka Broker 在 Bootstrap 过程中返回的集群 Metadata 中描述的 Endpoint 必须与 Flink 配置一致。可以通过以下步骤确认:
zkCli.sh
get /brokers/ids/<BROKER_ID>
listener_security_protocol_map
中的 Endpoint 是否与 Flink 配置的 Kafka 地址一致。增加元数据获取超时时间
在 Flink 的 Kafka Producer 配置中,增加 request.timeout.ms
和 metadata.max.age.ms
参数的值,以延长元数据获取的超时时间。例如:
request.timeout.ms=120000
metadata.max.age.ms=120000
调整重试次数和缓冲区大小
增加 Kafka Producer 的重试次数和缓冲区大小,以提高容错能力:
retries=5
buffer.memory=67108864
如果使用了 Kafka Connect,确保 Connector 任务的配置正确。根据知识库资料,可以通过以下命令验证参数是否校验通过:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
如果返回的 errors
属性非空,则需要修正对应的参数。
通过以上步骤,应能有效解决 TimeoutException: Topic not present in metadata
的问题。如果问题仍然存在,请进一步检查 Kafka 集群的日志和 Flink 作业的详细日志以定位根本原因。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。