kafka连接失败

简介: kafka连接报错java.net.InetAddress.getCanonicalHostName
from kafka import KafkaConsumer
# import redis
import time
from kafka import KafkaProducer
bootstrap_servers = ['1*2.1*.*4.27:9092', '1*2.1*.*4.27:9092', '172.17.64.29:9092']
group_id = 'abcdefg'
topic = 'test'

consumer = KafkaConsumer(group_id=group_id,
                         bootstrap_servers=bootstrap_servers)
consumer.subscribe(topics=(topic,))
for msg in consumer:
    print(msg)

执行时报错

Traceback (most recent call last):
  File "a.py", line 15, in <module>
    bootstrap_servers=bootstrap_servers)
  File "/usr/local/anaconda/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/anaconda/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/anaconda/lib/python3.6/site-packages/kafka/client_async.py", line 828, in check_version
    conn = self._conns[try_node]
KeyError: 1

解决:
配置文件中:server.properties

* 

listeners:kafka的连接协议名、主机名和端口,如果没有配置,将使用java.net.InetAddress.getCanonicalHostName()的返回值作为主机名

* 

advertised.listeners:生产者和消费者使用的主机名和端口,如果没有配置,将使用listeners的配置,如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()的返回值

本地请求时获取的主机名为线上主机名,而本地主机名和线上主机名相同,都为node-1 ,通过/etc/hosts映射为本地ip,然后通过本地ip无法获取有效信息,此时只需再server.properties添加listeners=PLAINTEXT://12.1.*4.27:9092 -- 此时都填写ip,若填写hostname,则本地必须也有相同的ip映射


############################# Socket Server Settings #############################


# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://1*2.1*.*4.27:9092```  
相关文章
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
消息中间件 Kafka 网络安全
Conduktor连接阿里云Kafka集群
Conduktor是一款商业化的Apache Kafka Connector,可以使用该工具连接Kafka Cluster,方便对集群信息如Topic,Group,Partition,Offset能信息的在线管理的查看,本文主要在Windows10环境下演示该工具的下载以及如果连接阿里云上的Kafka集群。
1182 0
Conduktor连接阿里云Kafka集群
|
2月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
4月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
26 0
|
2月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
3月前
|
消息中间件 监控 Java
查询Kafka生产者是否连接到Kafka服务
查询Kafka生产者是否连接到Kafka服务
184 2
|
4月前
|
消息中间件 算法 Java
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
|
5月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错之连接外部kafka本地执行测试代码报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 弹性计算 Kafka
数据传输服务DTS的Kafka连接问题可能有以下几个原因
数据传输服务DTS的Kafka连接问题可能有以下几个原因
102 2