当你的代码卡住了:聊聊Python里的“假同步真异步”

简介: 本文以小李爬虫卡顿为引,深入浅出解析I/O密集型任务的性能瓶颈,对比同步阻塞与异步并发的本质差异;详解线程池(ThreadPoolExecutor)和asyncio.to_thread等“零重写”提速方案,并警示假异步陷阱。实用、接地气,助你用最少改动获最大性能提升。(239字)

小李今天差点把电脑砸了。

他写了一个爬虫,要从一万个网站上抓数据。代码很简单:请求网址、解析内容、存进数据库。跑了十分钟,才抓了三百个。他打开任务管理器一看,CPU占用率才5%,网络流量几乎为零。

“我这电脑是i9啊,怎么就这水平?”
代理 IP 使用小技巧 让你的数据抓取效率翻倍 (49).png

问题出在哪?他的代码老老实实一个一个等:请求发出去,等服务器响应,等数据传回来,然后再发下一个。每个请求耗时0.5秒,一万个就是5000秒,一个多小时。但CPU大部分时间都在闲着,因为它在等网络。

这就是典型的I/O密集型任务。代码在等,但CPU不干活。

小李心想:能不能让它不等?发出去十个请求,谁先回来就处理谁?

当然能。这就是“异步”。

但问题是,他的代码是同步写的。改写成异步?几十个函数都得动,一堆库要换,想想就头大。

于是他想知道:有没有办法,让同步代码“假装”在异步执行?

先搞清楚:同步和异步到底差在哪
举个例子。

你点了三份外卖。同步的做法是:站在第一家店门口等,拿到第一份,再去第二家等,拿到第二份,再去第三家等。第二家店如果忙,你就干等着。全程啥也干不了。

异步的做法是:三家店都下单,然后回家坐着。谁做好了给你打电话,你去拿。中间你可以看电视、打游戏、甚至再点一份。

在代码里,“等”通常就是I/O操作——读文件、发HTTP请求、查数据库、等用户输入。这些操作的特点是:慢,但不太占CPU。

同步代码遇到I/O就卡住。异步代码遇到I/O就去干别的,等I/O完成了再回来继续。

那么问题来了:同步代码怎么异步执行?
Python里有个很直接的办法:扔进线程池。

说白了就是开几个“小弟”,每个小弟跑一个同步任务。主程序不用等,继续干自己的。

看个例子:

import time
import requests
from concurrent.futures import ThreadPoolExecutor

这是一个同步函数,请求一个网址

def fetch(url):
print(f"开始抓取 {url}")
response = requests.get(url) # 这里会等
print(f"抓取完成 {url}")
return response.status_code

十个网址

urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 11)]

同步执行:一个一个等

start = time.time()
for url in urls:
fetch(url)
print(f"同步耗时: {time.time() - start:.2f}秒")

异步执行:用线程池

start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(fetch, urls)
print(f"线程池耗时: {time.time() - start:.2f}秒")

跑一下你会发现:同步版本大概20秒(每个请求等2秒),线程池版本只要4秒左右。

神奇吗?不神奇。就是开了5个线程,每个线程处理两个请求,同时等。

但这里有个坑:线程池适合I/O任务,不适合CPU密集任务。你如果开10个线程做计算(比如循环一亿次),反而会因为线程切换开销变慢。

有没有更轻量的办法?有,asyncio
线程池虽然好用,但每个线程要占内存(大概8MB),开多了扛不住。而且线程切换有开销。

Python 3.4之后引入了asyncio,它是真正的异步,不靠线程,靠一个叫“事件循环”的东西。

但问题是,asyncio要求你的函数必须是异步的——也就是说,你要把requests换成aiohttp,把time.sleep换成asyncio.sleep,代码几乎要重写。

那有没有办法让同步代码跑在asyncio里?

有。asyncio.to_thread。

import asyncio
import requests

def sync_fetch(url):

# 这是一个同步函数,没法直接await
return requests.get(url).status_code

async def main():
urls = [...] # 十个网址

# 把同步函数扔到线程池里跑,但用异步的方式等待
tasks = [asyncio.to_thread(sync_fetch, url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)

asyncio.run(main())

这个方法的本质还是线程池,但写法更优雅,可以和真正的异步代码混用。

再深一层:事件循环是怎么骗过你的
如果你想知道“异步到底是怎么做到的”,我们得聊聊事件循环。

事件循环就像一个调度中心。它手里维护一个任务列表。每个任务要么在运行,要么在等某个事情(比如网络数据)。当一个任务说“我在等”,事件循环就把它挂起,去执行下一个任务。

等那个网络数据到了,事件循环再把任务唤醒,从刚才停下的地方继续。

听起来复杂,但Python的asyncio已经帮你封装好了。你只需要把函数写成async def,里面用await表示“这里要等”。

但问题是,我们手头有大量同步代码,不可能全改写成async def。

有没有一个黑科技,能把同步函数直接变成异步的?

有,但不太完美。asyncio提供了一个loop.run_in_executor,本质上还是线程池。真正的“把同步代码变成纯异步”是不可能的,因为同步代码里如果有time.sleep(10),那就是实打实地阻塞线程,谁也救不了你。

实战:给一个同步爬虫提速
假设你写了一个爬虫,大概是这样的:

def crawl_one(url):

# 发请求
r = requests.get(url)
# 解析
soup = BeautifulSoup(r.text, 'html.parser')
# 提取数据
title = soup.find('title').text
# 存数据库
db.insert({'url': url, 'title': title})
return title

def crawl_all(urls):
results = []
for url in urls:
results.append(crawl_one(url))
return results

要提速,最简单的改动:

from concurrent.futures import ThreadPoolExecutor, as_completed

def crawl_all_parallel(urls, workers=10):
results = []
with ThreadPoolExecutor(max_workers=workers) as executor:

    # 提交所有任务
    future_to_url = {executor.submit(crawl_one, url): url for url in urls}
    # 谁先完成就处理谁
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            result = future.result()
            results.append(result)
            print(f"完成: {url}")
        except Exception as e:
            print(f"失败: {url}, 错误: {e}")
return results

就这么几行改动,速度提升接近workers倍(受限于网络带宽和对方服务器的承受能力)。

但要注意:如果你的crawl_one里用了数据库连接,得确保数据库连接是线程安全的。很多数据库驱动不是,这时候你可能需要每个线程单独创建连接。

真正的异步:怎么把同步库改成异步?
有时候你不得不面对一个现实:你想用的库只有同步版本,比如requests、pymysql、redis-py(老版本)。

三个办法:

方法一:线程池包装

async def async_get(url):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, requests.get, url)

这个None表示使用默认的线程池。简单,但每个调用都会占用一个线程。

方法二:找异步替代品

改代码是麻烦,但一旦改完,性能提升显著,而且不占线程。

方法三:用anyio或trio

这两个库提供了更高级的抽象,可以让同步代码在异步环境中运行得更自然。但学习曲线比较陡,不推荐新手尝试。

一个容易踩的坑:假异步
很多人写异步代码,写着写着就变成这样了:

async def fetch(url):
response = requests.get(url) # 同步操作!
return response.text

async def main():
tasks = [fetch(url) for url in urls]
await asyncio.gather(*tasks)

你猜怎么着?完全没有提速。

因为requests.get是同步阻塞的。当你await一个任务时,这个任务如果内部阻塞了,整个事件循环都会被卡住。

记住一句话:异步的传染性。一旦你用了async,从调用链的根到叶子,所有涉及I/O的地方都必须是异步的。中间混了一个同步阻塞调用,整个异步就废了。

检查方法很简单:在代码里搜requests、time.sleep、open这些同步操作,看它们是否出现在async函数里。

有没有更激进的方案?有,但不太推荐
方案一:gevent

gevent是一个第三方库,它通过“打补丁”的方式,把Python标准库里的同步I/O操作(比如socket、time.sleep)偷偷替换成异步版本。你不需要写async/await,代码看起来完全是同步的,但实际是异步执行的。

from gevent import monkey
monkey.patch_all() # 这行会替换标准库
import requests # 现在requests是异步的了

from gevent.pool import Pool

def fetch(url):
return requests.get(url).status_code

pool = Pool(10)
urls = [...]
results = pool.map(fetch, urls)

看起来很美好,但问题也不少:

方案二:curio或trio

这两个是比asyncio更现代、更易用的异步库。但生态不如asyncio,第三方支持少。

实际项目里怎么选?
我见过很多团队纠结这个问题。给你一个决策树:

一个完整的例子:混合方案
假设你有这样一个需求:从1000个API抓数据,然后对每个数据做一次CPU密集计算(比如图像处理)。

混合方案最合适:

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests
import time

CPU密集函数

def process_data(data):

# 假设这里有复杂的计算
time.sleep(0.1)  # 模拟计算
return data * 2

I/O密集函数

def fetch_data(url):
return requests.get(url).json()

async def main():
urls = [...] # 1000个URL

# 用线程池处理I/O
with ThreadPoolExecutor(max_workers=50) as io_executor:
    loop = asyncio.get_running_loop()
    fetch_tasks = [
        loop.run_in_executor(io_executor, fetch_data, url)
        for url in urls
    ]
    raw_data = await asyncio.gather(*fetch_tasks)

# 用进程池处理CPU密集任务
with ProcessPoolExecutor(max_workers=8) as cpu_executor:
    process_tasks = [
        loop.run_in_executor(cpu_executor, process_data, data)
        for data in raw_data
    ]
    results = await asyncio.gather(*process_tasks)

return results

asyncio.run(main())

这个方案里:

总结一句话
同步代码想异步执行,最简单的就是线程池。想要更高效更优雅,就用asyncio配合to_thread。但记住:没有银弹,真正的异步需要你从底层改起。

回到小李的爬虫。他最后选择了ThreadPoolExecutor,改了10行代码,速度提升了8倍。虽然不完美,但够用了。

“够用就好”这四个字,在工程里往往比“最优”更重要。

目录
相关文章
|
7月前
|
数据采集 人工智能 编解码
AI出码率70%+的背后:高德团队如何实现AI研发效率的量化与优化
本文系统阐述了在AI辅助编程快速发展的背景下,如何构建一套科学、可落地的研发效率量化指标体系
2063 27
AI出码率70%+的背后:高德团队如何实现AI研发效率的量化与优化
|
Unix 网络安全 数据安全/隐私保护
putty Faual Error:No supported authentication methods available (server sent: publickey)
putty Faual Error:No supported authentication methods available (server sent: publickey)
3714 0
|
关系型数据库 MySQL 中间件
MySQL 中如何实现分库分表?常见的分库分表策略有哪些?
在MySQL中,分库分表(Sharding)通过将数据分散到多个数据库或表中,以应对大量数据带来的性能和扩展性问题。常见策略包括:哈希分片(分布均匀,查询效率高)、范围分片(适合范围查询)、列表分片(适用于特定值查询)、复合分片(灵活性高)和动态分片(灵活应对负载变化)。每种策略各有优劣,需根据业务需求选择。常用工具如MyCAT、ShardingSphere和TDDL可简化实现过程。
|
Ubuntu Oracle 关系型数据库
Oracle VM VirtualBox之Ubuntu 22.04LTS双网卡网络模式配置
这篇文章是关于如何在Oracle VM VirtualBox中配置Ubuntu 22.04LTS虚拟机双网卡网络模式的详细指南,包括VirtualBox网络概述、双网卡网络模式的配置步骤以及Ubuntu系统网络配置。
2651 3
|
Kubernetes API 调度
在K8S中,创建pod的过程是什么?
在K8S中,创建pod的过程是什么?
|
XML 数据可视化 机器人
08 ROS的其他常见工具
本文概述了ROS(机器人操作系统)中的一些常见工具包,包括rqt工具箱、Rviz三维可视化工具、Gazebo物理仿真环境和rosbag数据记录与回放工具的使用方法和功能。
435 0