python3.6操作kafka, 生产者消费者队列

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: python3.6操作kafka, 生产者消费者队列

介绍一下使用场景, 我这边之前使用redis做生产者消费者队列, 然后因为redis容量不大, 升级成本也比较高, 所以就拿kafka用来做消息队列, 因为数据是及时生产及时消费的, 所以说也就没有用太深, 拿topic当redis的key用的


后续测试了一下, 用kafka的速度要比pykafka的速度快10倍左右, 代码也比较简便, 所以说还是用kafka连接吧, 示例代码:

from kafka import KafkaConsumer
from kafka import KafkaProducer
server_list = [ "192.168.0.1:xxxx","192.168.0.2:xxxx"]
# 生产者
producer = KafkaProducer(bootstrap_servers=server_list, compression_type='gzip')
msg = {"name": "小飞1", "text": "测试1"}
bmsg = bytes(str(msg).encode('utf-8'))
producer.send('xiaofei_test', bmsg)
# 消费者
consumer = KafkaConsumer('xiaofei_test', auto_offset_reset='earliest', bootstrap_servers=server_list)
print(consumer)
for msg in consumer:
    print(msg)

生产者(pykafka)

from pykafka import KafkaClient
import json
hosts = "192.168.0.1:xxxx,192.168.0.2:xxxx"
client = KafkaClient(hosts=hosts)
print(client.topics)
key = "test"
key = bytes(key, encoding='utf8')
topic = client.topics[key]
##因为是简单使用, 所以没有分组, 只是用topic 当redis中的key使用
producer = topic.get_producer(sync=True)
producer.start()
print(producer)
# 生产消息
msg_dict ={"test": "测试数据"}
msg = json.dumps(msg_dict)
with topic.get_sync_producer() as producer:
    producer.produce(bytes(msg, encoding='utf8'))
    print(msg)
    print('插入成功')

消费者(pykafka)

from pykafka import KafkaClient
hosts = "192.168.0.1:xxxx,192.168.0.2:xxxx"
client = KafkaClient(hosts=hosts)
print("Kafka client:", client.topics)
# 消费者
key = "chengdu-cdfgjtj-research_details"
key = bytes(key, encoding='utf8')
topic = client.topics[key]
#一些参数信息可以看一下 https://www.cnblogs.com/jun1019/p/6656223.html
consumer = topic.get_simple_consumer(auto_commit_enable=True)
# consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, consumer_id='test')
for message in consumer:
    if message is not None:
        print("consumer message:", message.offset)
        print(message.value)


目录
相关文章
|
1天前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
1天前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
48 0
|
1天前
|
消息中间件 负载均衡 Kafka
【Kafka面试演练】那Kafka消费者手动提交、自动提交有什么区别?
嗯嗯Ok。分区的作用主要就是为了提高Kafka处理消息吞吐量。每一个topic会被分为多个分区。假如同一个topic下有n个分区、n个消费者,这样的话每个分区就会发送消息给对应的一个消费者,这样n个消费者负载均衡地处理消息。同时生产者会发送消息给不同分区,每个分区分给不同的brocker处理,让集群平坦压力,这样大大提高了Kafka的吞吐量。面试官思考中…
79 4
|
1天前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
99 2
|
1天前
|
数据采集 安全 Python
python并发编程:Python实现生产者消费者爬虫
python并发编程:Python实现生产者消费者爬虫
30 0
python并发编程:Python实现生产者消费者爬虫
|
1天前
|
Python
Python实现数据结构(如:链表、栈、队列等)。
Python实现数据结构(如:链表、栈、队列等)。
258 0
|
1天前
|
存储 缓存 算法
Python中collections模块的deque双端队列:深入解析与应用
在Python的`collections`模块中,`deque`(双端队列)是一个线程安全、快速添加和删除元素的双端队列数据类型。它支持从队列的两端添加和弹出元素,提供了比列表更高的效率,特别是在处理大型数据集时。本文将详细解析`deque`的原理、使用方法以及它在各种场景中的应用。
|
1天前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
263 4
|
1天前
|
前端开发 Python
Python中如何用栈实现队列
Python中如何用栈实现队列
214 0
|
1天前
|
消息中间件 缓存 安全
Kafka 的生产者优秀架构设计
Kafka 的生产者优秀架构设计
29 0