from kafka import KafkaConsumer
# import redis
import time
from kafka import KafkaProducer
bootstrap_servers = ['1*2.1*.*4.27:9092', '1*2.1*.*4.27:9092', '172.17.64.29:9092']
group_id = 'abcdefg'
topic = 'test'
consumer = KafkaConsumer(group_id=group_id,
bootstrap_servers=bootstrap_servers)
consumer.subscribe(topics=(topic,))
for msg in consumer:
print(msg)
执行时报错
Traceback (most recent call last):
File "a.py", line 15, in <module>
bootstrap_servers=bootstrap_servers)
File "/usr/local/anaconda/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/anaconda/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/anaconda/lib/python3.6/site-packages/kafka/client_async.py", line 828, in check_version
conn = self._conns[try_node]
KeyError: 1
解决:
配置文件中:server.properties
*
listeners:kafka的连接协议名、主机名和端口,如果没有配置,将使用java.net.InetAddress.getCanonicalHostName()的返回值作为主机名
*
advertised.listeners:生产者和消费者使用的主机名和端口,如果没有配置,将使用listeners的配置,如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()的返回值
本地请求时获取的主机名为线上主机名,而本地主机名和线上主机名相同,都为node-1 ,通过/etc/hosts映射为本地ip,然后通过本地ip无法获取有效信息,此时只需再server.properties添加listeners=PLAINTEXT://12.1.*4.27:9092 -- 此时都填写ip,若填写hostname,则本地必须也有相同的ip映射
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://1*2.1*.*4.27:9092```