如何使用Python读写Kafka?

简介: 如何使用Python读写Kafka?

摄影:产品经理吃了不会秃头的秃黄油

关于Kafka的第三篇文章,我们来讲讲如何使用Python读写Kafka。这一篇文章里面,我们要使用的一个第三方库叫做kafka-python。大家可以使用pip或者pipenv安装它。下面两种安装方案,任选其一即可。

python3 -m pip install kafka-python
pipenv install kafka-python

如下图所示:

这篇文章,我们将会使用最短的代码来实现一个读、写Kafka的示例。

创建配置文件

由于生产者和消费者都需要连接Kafka,所以我单独写了一个配置文件config.py用来保存连接Kafka所需要的各个参数,而不是直接把这些参数Hard Code写在代码里面:

# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'

本文演示所用的Kafka由我司平台组的同事搭建,需要账号密码才能连接,所以我在配置文件中加上了USERNAMEPASSWORD两项。你使用的Kafka如果没有账号和密码,那么你只需要SERVERTOPIC即可。

创建生产者

代码简单到甚至不需要解释。首先使用KafkaProducer类连接 Kafka,获得一个生产者对象,然后往里面写数据。

import json
import time
import datetime
import config
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=config.SERVER,
                         value_serializer=lambda m: json.dumps(m).encode())
for i in range(100):
    data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(config.TOPIC, data)
    time.sleep(1)

参数bootstrap_servers用于指定 Kafka 的服务器连接地址。

参数value_serializer用来指定序列化的方式。这里我使用 json 来序列化数据,从而实现我向 Kafka 传入一个字典,Kafka 自动把它转成 JSON 字符串的效果。

如下图所示:

注意,上图中,我多写了4个参数:

security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD

这四个参数是因为我这里需要通过密码连接 Kafka 而加上的,如果你的 Kafka 没有账号密码,就不需要这四个参数。

创建消费者

Kafka 消费者也需要连接 Kafka,首先使用KafkaConsumer类初始化一个消费者对象,然后循环读取数据。代码如下:

import config
from kafka import KafkaConsumer
consumer = KafkaConsumer(config.TOPIC,
                         bootstrap_servers=config.SERVER,
                         group_id='test',
                         auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)

KafkaConsumer 的第一个参数用于指定 Topic。你可以把这个 Topic 理解成 Redis 的 Key。

bootstrap_servers用于指定 Kafka 服务器连接地址。

group_id这个参数后面的字符串可以任意填写。如果两个程序的Topicgroup_id相同,那么他们读取的数据不会重复,两个程序的Topic相同,但是group_id不同,那么他们各自消费全部数据,互不影响。

auto_offset_rest 这个参数有两个值,earliestlatest,如果省略这个参数,那么默认就是latest。这个参数会单独介绍。这里先略过。

连接好 Kafka 以后,直接对消费者对象使用 for 循环迭代,就能持续不断获取里面的数据了。

运行演示

运行两个消费者程序和一个生产者程序,效果如下图所示。

我们可以看到,两个消费者程序读取数据不重复,不遗漏。

当所有数据都消费完成以后,如果你把两个消费者程序关闭,再运行其中一个,你会发现已经没有数据会被打印出来了。

但如果你修改一下 group_id,程序又能正常从头开始消费了,如下图所示:

很多人都会搞混的几个地方

earliest 与 latest

在我们创建消费者对象的时候,有一个参数叫做auto_offset_reset='earliest'。有人看到earliestlatest,想当然地认为设置为earliest,就是从 Topic 的头往后读,设置为latest就是忽略之前的数据,从程序运行以后,新来的数据开始读。

这种看法是不正确的。

auto_offset_reset这个参数,只有在一个group第一次运行的时候才有作用,从第二次运行开始,这个参数就失效了。

假设现在你的 Topic 里面有100个数据,你设置了一个全新的 group_id 为test2auto_offset_reset设置为 earliest。那么当你的消费者运行的时候,Kafka 会先把你的 offset 设置为0,然后让你从头开始消费的。

假设现在你的 Topic 里面有100个数据,你设置了一个全新的 group_id 为test3auto_offset_reset设置为 latest。那么当你的消费者运行的时候,Kafka 不会给你返回任何数据,消费者看起来就像卡住了一样,但是 Kafka 会直接强制把前100条数据的状态设置为已经被你消费的状态。所以当前你的 offset 就直接是99了。直到生产者插入了一条新的数据,此时消费者才能读取到。这条新的数据对应的 offset 就变成了100。

假设现在你的 Topic 里面有100个数据,你设置了一个全新的 group_id 为test4auto_offset_reset设置为 earliest。那么当你的消费者运行的时候,Kafka 会先把你的 offset 设置为0,然后让你从头开始消费的。等消费到第50条数据时,你把消费者程序关了,把auto_offset_reset设置为latest,再重新运行。此时消费者依然会接着从第51条数据开始读取。不会跳过剩下的50条数据。

所以,auto_offset_reset的作用,是在你的 group 第一次运行,还没有 offset 的时候,给你设定初始的 offset。而一旦你这个 group 已经有 offset 了,那么auto_offset_reset这个参数就不会再起作用了。

partition 是如何分配的?

对于同一个 Topic 的同一个 Group:

假设你的 Topic 有10个  Partition,一开始你只启动了1个消费者。那么这个消费者会轮换着从这10个Partition 中读取数据。

当你启动第二个消费者时,Kafka 会从第一个消费者手上抢走5个Partition,分给第二个消费者。于是两个消费者各自读5个 Partition。互不影响。

当第三个消费者又出现时,Kafka 从第一个消费者手上再抢走1个 Partition,从第二个消费者手上抢走2个 Partition 给第三个消费者。于是,消费者1有4个 Partition,消费者2有3个 Partition,消费者3有3个 Partiton,互不影响。

当你有10个消费者一起消费时,每个消费者读取一个 Partition,互不影响。

当第11个消费者出现时,它由于分配不到 Partition,所以它什么都读不到。

所以在上一篇文章中,我说,在同一个 Topic,同一个 Group 中,你有多少个 Partiton,就能起多少个进程同时消费。

Kafka 是不是完全不重复不遗漏?

在极端情况下,Kafka 会重复,也会遗漏,但是这种极端情况并不常见。如果你的 Kafka 频繁漏数据,或者总是出现重复数据,那么肯定是你环境没有搭建正确,或者代码有问题。

目录
相关文章
|
2月前
|
数据挖掘 Python
🚀告别繁琐!Python I/O管理实战,文件读写效率飙升的秘密
在日常编程中,高效的文件I/O管理对提升程序性能至关重要。Python通过内置的`open`函数及丰富的库简化了文件读写操作。本文从基本的文件读写入手,介绍了使用`with`语句自动管理文件、批量读写以减少I/O次数、调整缓冲区大小、选择合适编码格式以及利用第三方库(如pandas和numpy)等技巧,帮助你显著提升文件处理效率,让编程工作更加高效便捷。
43 0
|
17天前
|
Python
python读写操作excel日志
主要是读写操作,创建表格
48 2
|
2月前
|
消息中间件 Kafka Python
|
2月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
25 3
|
29天前
|
索引 Python
Excel学习笔记(一):python读写excel,并完成计算平均成绩、成绩等级划分、每个同学分数大于70的次数、找最优成绩
这篇文章是关于如何使用Python读取Excel文件中的学生成绩数据,并进行计算平均成绩、成绩等级划分、统计分数大于70的次数以及找出最优成绩等操作的教程。
53 0
|
3月前
|
SQL JSON 关系型数据库
n种方式教你用python读写excel等数据文件
n种方式教你用python读写excel等数据文件
|
4月前
|
数据挖掘 Python
🚀告别繁琐!Python I/O管理实战,文件读写效率飙升的秘密
【7月更文挑战第29天】在 Python 编程中,高效的文件 I/O 对性能至关重要。
49 4
|
3月前
|
Python
Python:读写操作
【8月更文挑战第20天】
36 0
|
3月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
存储 对象存储 Python
`openpyxl`是一个用于读写Excel 2010 xlsx/xlsm/xltx/xltm文件的Python库。它不需要Microsoft Excel,也不需要.NET或COM组件。
`openpyxl`是一个用于读写Excel 2010 xlsx/xlsm/xltx/xltm文件的Python库。它不需要Microsoft Excel,也不需要.NET或COM组件。
下一篇
无影云桌面