kafka外网访问疑云

简介: 前言最近在搭建数据实时捕获和传输管道(CDC)时用到了kafka,首先在阿里云ECS上搭建了一个最简单的kafka实例,因为是阿里云内网ECS,经测试无误后通过公网映射出来到公司网络环境提供服务,主要后面需要将DTS中的数据塞到kafka提供到本地调试。

kafka外网访问疑云

前言

最近在搭建数据实时捕获和传输管道(CDC)时用到了kafka,首先在阿里云ECS上搭建了一个最简单的kafka实例,因为是阿里云内网ECS,经测试无误后通过公网映射出来到公司网络环境提供服务,主要后面需要将DTS中的数据塞到kafka提供到本地调试。但是因为对kafka不了解,在本地连接kafka时遇到了各种网络问题。

问题集

如果Topic不存在时,通过kafka-client发送消息,会抛如下异常:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1186)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:880)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:690)
at com.keking.midplatform.jobs.demo.KafkaProducerDemo.main(KafkaProducerDemo.java:39)
AI 代码解读

Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

这个时候检查Topic已经存在了,而且异常是超时异常,所以再次运行,然后抛如下异常了:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for fullfillment-2: 30032 ms has passed since batch creation plus linger time
error occurred

at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.keking.midplatform.jobs.demo.KafkaProducerDemo.main(KafkaProducerDemo.java:41)
AI 代码解读

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for fullfillment-2: 30032 ms has passed since batch creation plus linger time

解决问题

在kafka的配置中有如下配置

advertised.listeners:=PLAINTEXT://xxxx:9092,默认是不开启的,官方说明如下

Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for `listeners` will be used. Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address.

译文:监听器发布到ZooKeeper以供客户端使用,如果不同于`listeners`配置属性。 在IaaS环境中,这可能需要与代理绑定的接口不同。 如果未设置,则将使用“listeners”的值。 与`listeners`不同,宣传0.0.0.0元地址是无效的。

大概意思就是如果提供服务给外网访问,需要配置代理的ip和端口。而我们正是这个场景,最后通过配置advertised.listeners成网关的ip+映射的端口,解决了问题

配置说明详情参考:http://kafka.apache.org/20/documentation.html#brokerconfigs

结语

这个场景很常见,遇到的人应该会很多,给他人一个参考吧

目录
打赏
0
0
0
0
12
分享
相关文章
【kafka可视化工具】kafka-eagle在windows环境的下载、安装、启动与访问
【kafka可视化工具】kafka-eagle在windows环境的下载、安装、启动与访问
1084 0
Kafka配置公网访问,直接暴露方式与nginx代理方式(绝对没问题)
Kafka配置公网访问,直接暴露方式与nginx代理方式(绝对没问题)
5174 1
K8S环境快速部署Kafka(K8S外部可访问)
本文通过实战展示了如何在K8S环境部署kafka集群,并且K8S环境外部也能使用此服务
3218 1
K8S环境快速部署Kafka(K8S外部可访问)
Java 客户端访问kafka
Java 客户端访问kafka
56 9
kafka 0.11x 启动30秒后自动停止,报【另一个程序正在使用此文件,进程无法访问】
环境:kafka_2.11-1.1.0,win7_64,java8 现象: 启动30秒后自动停止,报【另一个程序正在使用此文件,进程无法访问】 [2018-06-06 14:32:46,784] INFO [Log partition=myTopic-0, dir=D:\kafka_2.
2921 0
CentOS6.9安装Filebeat监控Nginx的访问日志发送到Kafka
一、下载地址: 官方:https://www.elastic.co/cn/downloads/beats/filebeat 百度云盘:https://pan.baidu.com/s/1dvhqb0   二、安装 tar zxvf filebeat-6.
3017 0
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。

相关实验场景

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等