python玩玩kafka

简介: python玩玩kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

kafka里面的一些概念:

  • producer:生产者。
  • consumer:消费者。
  • topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
  • broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

kafka有四个核心API:producer API,consumer  API,streams API,connector API


kafka有什么用?

可它以有效的获取系统和应用程序之间的数据,对数据流进行转换或者反应。


关于kafka的下载安装就不过多介绍了,下面主要介绍的是使用python操作kafka。


首先安装kafka的模块:

pip install kafka


安装完我们就可以尝试着去跑个例子:

首先看看producer是怎么跑起来的:


from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()

调用KafkaProducer指定server地址即可


类似的来看看consumer例子:


from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
                         bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

对于consumer group(消费者群组),我们需要给一个群组id(用来区分单个消费者或是群组):


from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

使用consumer订阅多个主题,需要使用subscribe方法,传入需要订阅的标题:


from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

如果需要手动拉取信息,那我们需要加一个循环,在这个循环里监听,一直获取服务器信息:


from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1','topic2','top3'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg


如果想挂起consumer可以调用pause()方法,恢复调用resume()方法:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('topic1'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"



关于简单的操作就介绍到这里了,想了解更多:

https://pypi.org/project/kafka-python/


相关文章
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
473 2
|
数据采集 数据挖掘 关系型数据库
2024年5分钟就能完成的5个Python小项目,赶紧拿去玩玩吧(2),2024年最新腾讯面试题
2024年5分钟就能完成的5个Python小项目,赶紧拿去玩玩吧(2),2024年最新腾讯面试题
2024年5分钟就能完成的5个Python小项目,赶紧拿去玩玩吧(2),2024年最新腾讯面试题
|
消息中间件 Kafka Python
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
145 3
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
存储 消息中间件 数据挖掘
Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。
【7月更文挑战第5天】Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。示例代码展示了从Kafka消费数据,计算社交媒体活跃度和物联网设备状态,并可视化结果。适用于监控、故障检测等场景。通过学习和实践,提升实时数据分析能力。
347 0
|
消息中间件 Kafka Python
python如何使用kafka
python如何使用kafka
545 0
|
消息中间件 Kafka Python
Python:kafka基本操作
Python:kafka基本操作
176 0
|
18天前
|
数据采集 机器学习/深度学习 人工智能
Python:现代编程的首选语言
Python:现代编程的首选语言
188 102
|
18天前
|
数据采集 机器学习/深度学习 算法框架/工具
Python:现代编程的瑞士军刀
Python:现代编程的瑞士军刀
191 104

热门文章

最新文章

推荐镜像

更多