使用生成器把Kafka写入速度提高1000倍

简介: 使用生成器把Kafka写入速度提高1000倍

[如果代码显示有问题,请点击阅读原文]

通过本文你会知道Python里面什么时候用yield最合适。本文不会给你讲生成器是什么,所以你需要先了解Python的yield,再来看本文。

疑惑

多年以前,当我刚刚开始学习Python协程的时候,我看到绝大多数的文章都举了一个生产者-消费者的例子,用来表示在生产者内部可以随时调用消费者,达到和多线程相同的效果。这里凭记忆简单还原一下当年我看到的代码:

import time
def consumer():
    product = None
    while True:
        if product is not None:
            print('consumer: {}'.format(product))
        product = yield None
def producer():
    c = consumer()
    next(c)
    for i in range(10):
        c.send(i)
start = time.time()
producer()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

运行效果如下图所示。

这些文章的说法,就像统一好了口径一样,说这样写可以减少线程切换开销,从而大大提高程序的运行效率。但是当年我始终想不明白,这种写法与直接调用函数有什么区别,如下图所示。

直到后来我需要操作Kafka的时候,我明白了使用yield的好处。

探索

为了便于理解,我会把实际场景做一些简化,以方便说明事件的产生发展和解决过程。事件的起因是我需要把一些信息写入到Kafka中,我的代码一开始是这样的:

import time
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product):
    with topic.get_producer(delivery_reports=True) as producer:
        producer.produce(str(product).encode())
def feed():
    for i in range(10):
        consumer(i)
start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

这段代码的运行效果如下图所示。

写入10条数据需要100秒,这样的龟速显然是有问题的。问题就出在这一句代码:

with topic.get_producer(delivery_reports=True) as producer

获得Kafka生产者对象是一个非常耗费时间的过程,每获取一次都需要10秒钟才能完成。所以写入10个数据就获取十次生产者对象。这消耗的100秒主要就是在获取生产者对象,而真正写入数据的时间短到可以忽略不计。

由于生产者对象是可以复用的,于是我对代码作了一些修改:

import timefrom pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
products = []
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    for i in range(10):
        products.append(i)
    consumer(products)
start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

首先把所有数据存放在一个列表中,最后再一次性给consumer函数。在一个Kafka生产者对象中展开列表,再把数据一条一条塞入Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。

这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。

于是我又修改了代码。每100条数据保存一次,并清空暂存的列表:

import timefrom pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    products = []
    for i in range(1003):
        products.append(i)
        if len(products) >= 100:
            consumer(products)
            products = []
    if products:
        consumer(products)
start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

由于最后一轮循环可能无法凑够100条数据,所以feed函数里面,循环结束以后还需要判断products列表是否为空,如果不为空,还要再消费一次。这样的写法,在上面这段代码中,一共1003条数据,每100条数据获取一次生产者对象,那么需要获取11次生产者对象,耗时至少为110秒。

显然,要解决这个问题,最直接的办法就是减少获取Kafka生产者对象的次数并最大限度复用生产者对象。如果读者举一反三的能力比较强,那么根据开关文件的两种写法:

# 写法一
with open('test.txt', 'w', encoding='utf-8') as f:
    f.write('xxx')
# 写法二
f = open('test.txt', 'w', encoding='utf-8')
f.write('xxx')
f.close()

可以推测出获取Kafka生产者对象的另一种写法:

# 写法二
producer = topic.get_producer(delivery_reports=True)
producer.produce(b'xxxx')
producer.close()

这样一来,只要获取一次生产者对象并把它作为全局变量就可以一直使用了。

然而,pykafka的官方文档中使用的是第一种写法,通过上下文管理器with来获得生产者对象。暂且不论第二种方式是否会报错,只从写法上来说,第二种方式必需要手动关闭对象。开发者经常会出现开了忘记关的情况,从而导致很多问题。而且如果中间出现了异常,使用上下文管理器的第一种方式会自动关闭生产者对象,但第二种方式仍然需要开发者手动关闭。

函数VS生成器

但是如果使用第一种方式,怎么能在一个上下文里面接收生产者传进来的数据呢?这个时候才是yield派上用场的时候。

首先需要明白,使用yield以后,函数就变成了一个生成器。生成器与普通函数的不同之处可以通过下面两段代码来进行说明:

def funciton(i):
    print('进入')
    print(i)
    print('结束')
for i in range(5):
    funciton(i)

运行效果如下图所示。

函数在被调用的时候,函数会从里面的第一行代码一直运行到某个return或者函数的最后一行才会退出。

而生成器可以从中间开始运行,从中间跳出。例如下面的代码:

def generator():
    print('进入')
    i = None
    while True:
        if i is not None:
            print(i)
        print('跳出')
        i = yield None
g = generator()
next(g)
for i in range(5):
    g.send(i)

运行效果如下图所示。

从图中可以看到,进入只打印了一次。代码运行到i = yield None后就跳到外面,外面的数据可以通过g.send(i)的形式传进生成器,生成器内部拿到外面传进来的数据以后继续执行下一轮while循环,打印出被传进来的内容,然后到i = yield None的时候又跳出。如此反复。

所以回到最开始的Kafka问题。如果把with topic.get_producer(delivery_reports=True) as producer写在上面这一段代码的print('进入')这个位置上,那岂不是只需要获取一次Kafka生产者对象,然后就可以一直使用了?

根据这个逻辑,设计如下代码:

import timefrom pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer():
    with topic.get_producer(delivery_reports=True) as producer:
        print('init finished..')
        next_data = ''
        while True:
            if next_data:
                producer.produce(str(next_data).encode())
            next_data = yield True
def feed():
    c = consumer()
    next(c)
    for i in range(1000):
        c.send(i)
start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

这一次直接插入1000条数据,总共只需要10秒钟,相比于每插入一次都获取一次Kafka生产者对象的方法,效率提高了1000倍。运行效果如下图所示。

后记

读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一回事。但是第一段代码,也就是网上很多人讲yield的时候举的生产者-消费者的例子之所以会让人觉得毫无用处,就在于他们的消费者几乎就是秒运行,这样看不出和函数调用的差别。而我最后这一段代码,它的消费者分成两个部分,第一部分是获取Kafka生产者对象,这个过程非常耗时;第二部分是把数据通过Kafka生产者对象插入Kafka,这一部分运行速度极快。在这种情况下,使用生成器把这个消费者代码分开,让耗时长的部分只运行一次,让耗时短的反复运行,这样就能体现出生成器的优势。


目录
相关文章
|
2月前
|
存储 人工智能 文字识别
医疗病历结构化处理系统技术白皮书——基于多模态AI的医联体数据治理方案
本系统基于双端协同架构,集成移动端OCR识别与云端数据分析,实现医疗文档高效结构化处理。采用PaddleOCR轻量引擎与隐私计算技术,支持离线识别与敏感信息脱敏。后端构建分布式数据仓库与多租户机制,满足PB级存储与数据安全合规要求。实测OCR准确率达96.2%(印刷体)与88.7%(手写体),字段抽取F1值92.4%,显著提升病历处理效率与质量。
323 3
|
芯片 开发者 SoC
E906的中断系统|学习笔记
快速学习 E906的中断系统
679 0
E906的中断系统|学习笔记
|
6月前
|
机器学习/深度学习 数据格式
R1-Omni开源!多模态模型+RLVR,让各模态作用清晰可见
随着 DeepSeek R1 的推出,强化学习在大模型领域的潜力被进一步挖掘。Reinforcement Learning with Verifiable Reward (RLVR) 方法的出现,为多模态任务提供了全新的优化思路,无论是几何推理、视觉计数,还是经典图像分类和物体检测任务,RLVR 都展现出了显著优于传统监督微调(SFT)的效果。
368 12
|
6月前
|
数据采集 算法 数据安全/隐私保护
【硬件测试】基于FPGA的MSK调制解调系统系统开发与硬件片内测试,包含信道模块,误码统计模块,可设置SNR
本文基于FPGA实现MSK调制解调系统,采用Verilog开发,包含同步模块、高斯信道模拟、误码率统计等功能。相比仿真版本,新增ILA数据采集与VIO在线SNR设置模块。通过硬件测试验证,展示不同SNR(如10dB和16dB)下的性能表现。研究聚焦软件无线电领域,优化算法复杂度以适应硬件限制,利用MSK恒定包络、相位连续等特性提升频谱效率。核心代码实现信号生成、调制解调、滤波及误码统计,提供完整的硬件设计与分析方案。
205 19
|
7月前
|
人工智能 Prometheus 监控
容器化AI模型的监控与治理:确保模型持续稳定运行
在前几篇文章中,我们探讨了AI模型的容器化部署及构建容器化机器学习流水线。然而,将模型部署到生产环境只是第一步,更重要的是确保其持续稳定运行并保持性能。为此,必须关注容器化AI模型的监控与治理。 监控和治理至关重要,因为AI模型在生产环境中面临数据漂移、概念漂移、模型退化和安全风险等挑战。全面的监控涵盖模型性能、数据质量、解释性、安全性和版本管理等方面。使用Prometheus和Grafana可有效监控性能指标,而遵循模型治理最佳实践(如建立治理框架、定期评估、持续改进和加强安全)则能进一步提升模型的可信度和可靠性。总之,容器化AI模型的监控与治理是确保其长期稳定运行的关键。
|
SQL 存储 缓存
什么?部署ClickHouse的服务器CPU利用率100%了?
什么?部署ClickHouse的服务器CPU利用率100%了?
|
7月前
|
SQL 数据安全/隐私保护 索引
SQL语句速成
《SQL语句速成》由blue编写,涵盖建表、插入、查询、更新、删除、视图创建、权限管理及索引操作等核心内容。通过具体示例介绍SQL基本语法和常用聚合函数,帮助读者快速掌握SQL编程技巧。发布于2024年7月19日。
122 7
|
10月前
|
安全 IDE Swift
探索iOS开发之旅:从初学者到专家
在这篇文章中,我们将一起踏上iOS开发的旅程,从基础概念的理解到深入掌握核心技术。无论你是编程新手还是希望提升技能的开发者,这里都有你需要的指南和启示。我们将通过实际案例和代码示例,展示如何构建一个功能齐全的iOS应用。准备好了吗?让我们一起开始吧!
|
关系型数据库 分布式数据库 PolarDB
PolarDB Ganos的实时时空计算
PolarDB是阿里云自主研发的云原生关系型数据库,提供极致弹性、高性能、海量存储及安全可靠的数据库服务。PolarDB PostgreSQL版100%兼容PostgreSQL和Oracle语法,集成Ganos——新一代云原生时空数据库引擎,具备几何、栅格、轨迹等十大核心引擎能力,支持物理世界时空多模数据的混合存储与分析。本文介绍的Ganos实时电子围栏计算基于PolarDB PostgreSQL版,适用于交通物流、禁飞区管理、营销等多种场景,通过Flink实时计算实现高效的空间数据处理。
157 1
|
SQL 分布式计算 数据挖掘
PyODPS
【7月更文挑战第19天】
403 2