前言
最近在搭建数据实时捕获和传输管道(CDC)时用到了kafka,首先在阿里云ECS上搭建了一个最简单的kafka实例,因为是阿里云内网ECS,经测试无误后通过公网映射出来到公司网络环境提供服务,主要后面需要将DTS中的数据塞到kafka提供到本地调试。但是因为对kafka不了解,在本地连接kafka时遇到了各种网络问题。
问题集
如果Topic不存在时,通过kafka-client发送消息,会抛如下异常:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1186)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:880)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:690)
at com.keking.midplatform.jobs.demo.KafkaProducerDemo.main(KafkaProducerDemo.java:39)
AI 代码解读
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
这个时候检查Topic已经存在了,而且异常是超时异常,所以再次运行,然后抛如下异常了:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for fullfillment-2: 30032 ms has passed since batch creation plus linger time
error occurred
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.keking.midplatform.jobs.demo.KafkaProducerDemo.main(KafkaProducerDemo.java:41)
AI 代码解读
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for fullfillment-2: 30032 ms has passed since batch creation plus linger time
解决问题
在kafka的配置中有如下配置
advertised.listeners:=PLAINTEXT://xxxx:9092,默认是不开启的,官方说明如下
Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for `listeners` will be used. Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address.
译文:监听器发布到ZooKeeper以供客户端使用,如果不同于`listeners`配置属性。 在IaaS环境中,这可能需要与代理绑定的接口不同。 如果未设置,则将使用“listeners”的值。 与`listeners`不同,宣传0.0.0.0元地址是无效的。
大概意思就是如果提供服务给外网访问,需要配置代理的ip和端口。而我们正是这个场景,最后通过配置advertised.listeners成网关的ip+映射的端口,解决了问题
配置说明详情参考:http://kafka.apache.org/20/documentation.html#brokerconfigs
结语
这个场景很常见,遇到的人应该会很多,给他人一个参考吧