Python 使用python-kafka类库开发kafka生产者&消费者&客户端

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,182元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Python 使用python-kafka类库开发kafka生产者&消费者&客户端

使用python-kafka类库开发kafka生产者&消费者&客户端

 By: 授客 QQ:1033553122

 

 

 

1.测试环境

python 3.4

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下载地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下载地址1:

http://kafka.apache.org/downloads.html

下载地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pip-18.1.tar.gz

下载地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

说明:实践中发现,pip版本比较旧的话,没法安装whl文件

 

kafka_python-1.4.4-py2.py3-none-any.whl

下载地址1:

https://pypi.org/project/kafka-python/#files

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

 

下载地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

下载地址1:

https://www.lfd.uci.edu/~gohlke/pythonlibs/

 

下载地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

说明:

kafka-python支持gzip压缩/解压缩。如果要消费lz4方式压缩的消息,则需要安装python-lz4,如果要支持snappy方式压缩/解压缩则需要安装,否则可能会报错:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

构建生产者对象时,可通过compression_type 参数指定由对应生产者生产的消息数据的压缩方式,或者在producer.properties配置中配置compression.type参数。

 

参考链接:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

 

2.代码实践

生产者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

from kafka import KafkaProducer

import json

 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

 

 

for i in range(0, 100):

   producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

 

# Block直到单条消息发送完或者超时

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

 

# Block直到所有阻塞的消息发送到网络

# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms

 

 

# 序列化json数据

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

 

# 序列化字符串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

 

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

   producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

 

# 消息记录携带header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

 

# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时

metrics = producer.metrics()

print(metrics)

 

producer.flush()

 

实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:

进入到配置目录(config),编辑server.properties文件,

查找并设置listener,配置监听端口,格式:listeners = listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

 

 

API及常用参数说明:

class kafka.KafkaProducer(**configs)

bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)

 

key_serializer (可调用对象) –用于转换用户提供的key值为字节,必须返回字节数据。 如果为None,则等同调用f(key)。 默认值: None.

 

value_serializer(可调用对象) – 用于转换用户提供的value消息值为字节,必须返回字节数据。 如果为None,则等同调用f(value)。 默认值: None.

 

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic(str) – 设置消息将要发布到的主题,即消息所属主题

 

value(可选) – 消息内容,必须为字节数据,或者通过value_serializer序列化后的字节数据。如果为None,则key必填,消息等同于“删除”。( If value is None, key is required and message acts as a ‘delete’)

 

partition (int, 可选) – 指定分区。如果未设置,则使用配置的partitioner

 

key (可选) – 和消息对应的key,可用于决定消息发送到哪个分区。如果平partition为None,则相同key的消息会被发布到相同分区(但是如果key为None,则随机选取分区)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必须为字节数据或者通过配置的key_serializer序列化后的字节数据.

 

headers (可选) – 设置消息header,header-value键值对表示的list。list项为元组:格式 (str_header,bytes_value)

 

timestamp_ms (int, 可选) –毫秒数 (从1970 1月1日 UTC算起) ,作为消息时间戳。默认为当前时间

 

函数返回FutureRecordMetadata类型的RecordMetadata数据

 

flush(timeout=None)

发送所有可以立即获取的缓冲消息(即时linger_ms大于0),线程block直到这些记录发送完成。当一个线程等待flush调用完成而block时,其它线程可以继续发送消息。

 

注意:flush调用不保证记录发送成功

 

metrics(raw=False)

获取生产者性能指标。

 

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

 

注:生产者代码是线程安全的,支持多线程,而消费者则不然

 

消费者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

 

consumer = KafkaConsumer('MY_TOPIC1',

                        bootstrap_servers=['127.0.0.1:9092'],

                        #auto_offset_reset='',
                        auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费
                        enable_auto_commit=True, # 自动提交消费者的offset
                        auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔
                        group_id='MY_GROUP1',

                        consumer_timeout_ms= 10000, # 如果10秒内kafka中没有可供消费的数据,自动退出
                        client_id='consumer-python3'
                        )

 

for msg in consumer:

   print (msg)

   print('topic: ', msg.topic)

   print('partition: ', msg.partition)

   print('key: ', msg.key, 'value: ', msg.value)

   print('offset:', msg.offset)

   print('headers:', msg.headers)

 

# Get consumer metrics

metrics = consumer.metrics()

print(metrics)

 

运行效果

 

 

 

通过assign、subscribe两者之一为消费者设置消费的主题

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

                        auto_offset_reset='latest',

                        enable_auto_commit=True, # 自动提交消费数据的offset

                        consumer_timeout_ms= 10000, # 如果1秒内kafka中没有可供消费的数据,自动退出

                        value_deserializer=lambda m: json.loads(m.decode('ascii')), #消费json 格式的消息

                        client_id='consumer-python3'

                        )

 

 

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next(consumer)

# print(msg)

 

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

   print (msg)

 

 

API及常用参数说明:

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。

 

client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’

 

group_id (str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None

 

auto_offset_reset (str) – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。

 

enable_auto_commit (bool) –  如果为True,将自动定时提交消费者offset。默认为True。

 

auto_commit_interval_ms (int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。

 

value_deserializer(可调用对象) - 携带原始消息value并返回反序列化后的value

 

subscribe(topics=(), pattern=None, listener=None)

订阅需要的主题

topics (list) – 需要订阅的主题列表

pattern (str) – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。

 

metrics(raw=False)

获取消费者性能指标。

 

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

客户端

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka.client import KafkaClient

 

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

 

# 获取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

   print('broker: ', broker)

   print('broker nodeId: ', broker.nodeId)

 

# 获取主题的所有分区

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic(topic)

print(partitions)

 

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

 

 

运行结果:

broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId:  0

{0}

{'MY_TOPIC1': [0]}

 

API及常用参数说明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)

 

client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’

 

request_timeout_ms (int) – 客户端请求超时时间,单位毫秒。默认值: 30000.

 

参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

 

brokers()

获取所有broker元数据

 

available_partitions_for_topic(topic)

返回主题的所有分区

 

 

参考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

 

 

目录
相关文章
|
2月前
|
存储 监控 算法
淘宝买家秀 API开发实录Python(2025)
本文讲述了作者在电商开发领域,尤其是对接淘宝买家秀 API 接口过程中所经历的挑战与收获。从申请接入、签名验证、频率限制到数据处理和实时监控,作者分享了多个实战经验与代码示例,帮助开发者更高效地获取和处理买家秀数据,提升开发效率。
|
5月前
|
API C++ 开发者
PySide vs PyQt:Python GUI开发史诗级对决,谁才是王者?
PySide 和 PyQt 是 Python GUI 开发领域的两大利器,各有特色。PySide 采用 LGPL 协议,更灵活;PyQt 默认 GPL,商业使用需授权。两者背后团队实力雄厚,PySide 得到 Qt 官方支持,PyQt 由 Riverbank Computing 打造。API 设计上,PySide 简洁直观,贴近原生 Qt;PyQt 增加 Pythonic 接口,操作更高效。性能方面,两者表现优异,适合不同需求的项目开发。选择时可根据项目特点与开源要求决定。
412 20
|
1月前
|
设计模式 人工智能 API
AI智能体开发实战:17种核心架构模式详解与Python代码实现
本文系统解析17种智能体架构设计模式,涵盖多智能体协作、思维树、反思优化与工具调用等核心范式,结合LangChain与LangGraph实现代码工作流,并通过真实案例验证效果,助力构建高效AI系统。
297 7
|
2月前
|
算法 程序员 API
电商程序猿开发实录:淘宝商品python(2)
本文分享了开发者在对接淘宝商品详情API过程中的真实经历,涵盖权限申请、签名验证、限流控制、数据解析及消息订阅等关键环节,提供了实用的Python代码示例,帮助开发者高效调用API,提升系统稳定性与数据处理能力。
|
3月前
|
数据采集 存储 数据库
Python爬虫开发:Cookie池与定期清除的代码实现
Python爬虫开发:Cookie池与定期清除的代码实现
|
4月前
|
人工智能 搜索推荐 数据可视化
用 Python 制作简单小游戏教程:手把手教你开发猜数字游戏
本教程详细讲解了用Python实现经典猜数字游戏的完整流程,涵盖从基础规则到高级功能的全方位开发。内容包括游戏逻辑设计、输入验证与错误处理、猜测次数统计、难度选择、彩色输出等核心功能,并提供完整代码示例。同时,介绍了开发环境搭建及调试方法,帮助初学者快速上手。最后还提出了图形界面、网络对战、成就系统等扩展方向,鼓励读者自主创新,打造个性化游戏版本。适合Python入门者实践与进阶学习。
450 1
|
6月前
|
程序员 测试技术 开发工具
怎么开发Python第三方库?手把手教你参与开源项目!
大家好,我是程序员晚枫。本文将分享如何开发Python第三方库,并以我维护的开源项目 **popdf** 为例,指导参与开源贡献。Popdf是一个PDF操作库,支持PDF转Word、转图片、合并与加密等功能。文章涵盖从fork项目、本地开发、单元测试到提交PR的全流程,适合想了解开源贡献的开发者。欢迎访问[popdf](https://gitcode.com/python4office/popdf),一起交流学习!
227 21
怎么开发Python第三方库?手把手教你参与开源项目!
|
4月前
|
存储 算法 数据可视化
用Python开发猜数字游戏:从零开始的手把手教程
猜数字游戏是编程入门经典项目,涵盖变量、循环、条件判断等核心概念。玩家通过输入猜测电脑生成的随机数,程序给出提示直至猜中。项目从基础实现到功能扩展,逐步提升难度,适合各阶段Python学习者。
230 0
|
4月前
|
数据采集 存储 监控
抖音直播间采集提取工具,直播间匿名截流获客软件,Python开发【仅供学习】
这是一套基于Python开发的抖音直播间数据采集与分析系统,包含观众信息获取、弹幕监控及数据存储等功能。代码采用requests、websockets和sqlite3等...
|
6月前
|
JavaScript 前端开发 关系型数据库
基于Python+Vue开发的体育场馆预约管理系统源码+运行
本项目为大学生课程设计作业,采用Python和Vue技术构建了一个体育场馆预约管理系统(实现前后端分离)。系统的主要目标在于帮助学生理解和掌握Python编程知识,同时培养其项目规划和开发能力。参与该项目的学习过程,学生能够在实际操作中锻炼技能,为未来的职业发展奠定良好的基础。
152 3

推荐镜像

更多