开发者社区 > 云原生 > 云消息队列 > 正文

如何使用Python连接Kafka?

如何使用Python连接Kafka?

展开
收起
邂逅青青 2024-08-08 10:39:38 35 0
1 条回答
写回答
取消 提交回答
  • 首先你是需要安装
    pip install kafka-python

    然后python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区

    代码如下

    from confluent_kafka import Consumer, KafkaError
    
    # 配置信息,请根据实际情况替换
    conf = {
        'bootstrap.servers': 'your.bootstrap.servers',  # Kafka服务器地址
        'group.id': 'your-group-id',  # 消费者组ID
        'auto.offset.reset': 'latest'  # 自动偏移重置策略,latest表示从最新的消息开始消费
    }
    
    c = Consumer(conf)
    c.subscribe(['your-topic-name'])  # 订阅的主题名称
    
    while True:
        msg = c.poll(1.0)  # 阻塞等待消息,超时时间1秒
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Consumer error: {msg.error()}")
                continue
        print(f'Received message: {msg.value().decode("utf-8")}')
    
    c.close()
    

    连接工具

    image.png


    参考文档

    2024-08-30 17:31:43
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 Kafka 版
  • 相关电子书

    更多
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载
    Improving Python and Spark 立即下载