python之Kafka

简介: python之Kafka

1.安装kafka环境

# 看这个地址

# 你还需要装Java环境
# 测试启动如果启动成功,那么证明kafka启动成功
.\bin\windows\zookeeper-server-start.bat  .\config\zookeeper.properties
#启动kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties

# 创建top
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
# 消费者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

2.pykafka

 2,1安装

 pip install pykafka

 2.2生产者

from pykafka import KafkaClient
from conf import KAFKA_HOSTS_LOCALHOST
#连接kafka客户端
kafka_client = KafkaClient(hosts=KAFKA_HOSTS_LOCALHOST)
#获取topic
topic = kafka_client.topics["test"]
#获取生产者对象
produce = topic.get_producer()
#传数据必须是字节
produce.produce("now_time_bytes".encode("utf8"))
#手动关闭该生产者
produce.stop()

2.3消费者

# 导入安装包
from pykafka import KafkaClient
# 设置客户端的连接信息
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
# print(client.topics)
# print(topic.latest_available_offsets())
#consumer_group 与consumer_id值不能一样,不同group相互独立
consumer  = topic.get_simple_consumer(
    consumer_group='18',
    auto_commit_enable=True,
    auto_commit_interval_ms=1,
    reset_offset_on_start=True
    # consumer_id =1,
)


for message in consumer:
    if message is not None:
        print(message.offset, message.value)
相关文章
|
4月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
143 2
|
1天前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
|
1月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2月前
|
存储 消息中间件 数据挖掘
Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。
【7月更文挑战第5天】Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。示例代码展示了从Kafka消费数据,计算社交媒体活跃度和物联网设备状态,并可视化结果。适用于监控、故障检测等场景。通过学习和实践,提升实时数据分析能力。
80 0
|
4月前
|
消息中间件 Kafka Python
python如何使用kafka
python如何使用kafka
232 0
|
消息中间件 Kafka API
python玩玩kafka
python玩玩kafka
116 0
|
消息中间件 Kafka Python
kafka关键原理及python调用kafka示例
kafka关键原理及python调用kafka示例
102 0
|
消息中间件 JSON NoSQL
如何使用Python读写Kafka?
如何使用Python读写Kafka?
319 0
|
消息中间件 Kafka 测试技术
Python 基于pykafka简单实现KAFKA消费者
Python 基于pykafka简单实现KAFKA消费者
147 0
|
1月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
74 9