python3.6操作kafka, 生产者消费者队列

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: python3.6操作kafka, 生产者消费者队列

介绍一下使用场景, 我这边之前使用redis做生产者消费者队列, 然后因为redis容量不大, 升级成本也比较高, 所以就拿kafka用来做消息队列, 因为数据是及时生产及时消费的, 所以说也就没有用太深, 拿topic当redis的key用的


后续测试了一下, 用kafka的速度要比pykafka的速度快10倍左右, 代码也比较简便, 所以说还是用kafka连接吧, 示例代码:

from kafka import KafkaConsumer
from kafka import KafkaProducer
server_list = [ "192.168.0.1:xxxx","192.168.0.2:xxxx"]
# 生产者
producer = KafkaProducer(bootstrap_servers=server_list, compression_type='gzip')
msg = {"name": "小飞1", "text": "测试1"}
bmsg = bytes(str(msg).encode('utf-8'))
producer.send('xiaofei_test', bmsg)
# 消费者
consumer = KafkaConsumer('xiaofei_test', auto_offset_reset='earliest', bootstrap_servers=server_list)
print(consumer)
for msg in consumer:
    print(msg)

生产者(pykafka)

from pykafka import KafkaClient
import json
hosts = "192.168.0.1:xxxx,192.168.0.2:xxxx"
client = KafkaClient(hosts=hosts)
print(client.topics)
key = "test"
key = bytes(key, encoding='utf8')
topic = client.topics[key]
##因为是简单使用, 所以没有分组, 只是用topic 当redis中的key使用
producer = topic.get_producer(sync=True)
producer.start()
print(producer)
# 生产消息
msg_dict ={"test": "测试数据"}
msg = json.dumps(msg_dict)
with topic.get_sync_producer() as producer:
    producer.produce(bytes(msg, encoding='utf8'))
    print(msg)
    print('插入成功')

消费者(pykafka)

from pykafka import KafkaClient
hosts = "192.168.0.1:xxxx,192.168.0.2:xxxx"
client = KafkaClient(hosts=hosts)
print("Kafka client:", client.topics)
# 消费者
key = "chengdu-cdfgjtj-research_details"
key = bytes(key, encoding='utf8')
topic = client.topics[key]
#一些参数信息可以看一下 https://www.cnblogs.com/jun1019/p/6656223.html
consumer = topic.get_simple_consumer(auto_commit_enable=True)
# consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, consumer_id='test')
for message in consumer:
    if message is not None:
        print("consumer message:", message.offset)
        print(message.value)


目录
相关文章
|
21天前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
109 61
|
20天前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
|
17天前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
|
3月前
|
机器学习/深度学习 数据采集 TensorFlow
使用Python实现智能食品消费者行为分析的深度学习模型
使用Python实现智能食品消费者行为分析的深度学习模型
168 4
|
4月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
143 2
|
5月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
83 1
|
6月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
6月前
|
消息中间件 Kafka Python
|
6月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
65 3
|
7月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能