如何使用Python连接Kafka?

如何使用Python连接Kafka?

展开
收起
邂逅青青 2024-08-08 10:39:38 66 发布于浙江 分享
分享
版权
举报
1 条回答
写回答
取消 提交回答
  • 首先你是需要安装
    pip install kafka-python

    然后python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区

    代码如下

    from confluent_kafka import Consumer, KafkaError
    
    # 配置信息,请根据实际情况替换
    conf = {
        'bootstrap.servers': 'your.bootstrap.servers',  # Kafka服务器地址
        'group.id': 'your-group-id',  # 消费者组ID
        'auto.offset.reset': 'latest'  # 自动偏移重置策略,latest表示从最新的消息开始消费
    }
    
    c = Consumer(conf)
    c.subscribe(['your-topic-name'])  # 订阅的主题名称
    
    while True:
        msg = c.poll(1.0)  # 阻塞等待消息,超时时间1秒
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Consumer error: {msg.error()}")
                continue
        print(f'Received message: {msg.value().decode("utf-8")}')
    
    c.close()
    

    连接工具

    image.png


    参考文档

    2024-08-30 17:31:43 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

还有其他疑问?
咨询AI助理
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等