首先你是需要安装
pip install kafka-python
然后python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区
代码如下
from confluent_kafka import Consumer, KafkaError
# 配置信息,请根据实际情况替换
conf = {
'bootstrap.servers': 'your.bootstrap.servers', # Kafka服务器地址
'group.id': 'your-group-id', # 消费者组ID
'auto.offset.reset': 'latest' # 自动偏移重置策略,latest表示从最新的消息开始消费
}
c = Consumer(conf)
c.subscribe(['your-topic-name']) # 订阅的主题名称
while True:
msg = c.poll(1.0) # 阻塞等待消息,超时时间1秒
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Consumer error: {msg.error()}")
continue
print(f'Received message: {msg.value().decode("utf-8")}')
c.close()
连接工具
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/