Python 基于pykafka简单实现KAFKA消费者

本文涉及的产品
云原生网关 MSE Higress,422元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,182元/月
简介: Python 基于pykafka简单实现KAFKA消费者

基于pykafka简单实现KAFKA消费者

 

 

 

 

 

1.测试环境

python 3.4

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下载地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下载地址1:

http://kafka.apache.org/downloads.html

下载地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pykafka-2.8.0.tar.gz

下载地址1:

https://pypi.org/project/pykafka/

https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz

 

 

2.问题描述

使用python-kafka类库实现kafka消费者时,发现程序有时候会自动停止消费,对一些参数进行配置后无果,换成pykafka类库实现,搞定

 

 

3.代码简单实现

 

 

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from pykafka import KafkaClient

 

 

client = KafkaClient(hosts="127.0.0.1:9092")

 

# 获取主题

print(client.topics)

 

topic = client.topics['MY_TOPIC1']

 

# 获取消费者

consumer  = topic.get_balanced_consumer('MY_GROUP1', auto_commit_enable=True, auto_commit_interval_ms=3000)

for message in consumer:

   if message is not None:

       print(message.offset, message.value)

 

 

 

参考链接:

https://pykafka.readthedocs.io/en/latest/index.html

目录
相关文章
|
4月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
224 16
|
6月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
184 10
|
7月前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
151 1
|
9月前
|
机器学习/深度学习 数据采集 TensorFlow
使用Python实现智能食品消费者行为分析的深度学习模型
使用Python实现智能食品消费者行为分析的深度学习模型
256 4
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
191 62
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
231 58
|
10月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
301 2
|
11月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
136 1
|
12月前
|
消息中间件 Kafka Python
|
12月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
135 3

热门文章

最新文章

推荐镜像

更多