一、核心技术原理剖析
在深入代码实现前,我们首先厘清三种并发模式的底层逻辑,这是理解性能差异的基础:
1.1 多线程爬虫
线程是操作系统调度的基本单位,多线程通过在一个进程内创建多个执行流实现并发。Python 中的threading模块基于操作系统原生线程实现,但受GIL(全局解释器锁) 限制,同一时刻只有一个线程能执行 Python 字节码。这意味着 CPU 密集型任务无法通过多线程实现真正并行,但 I/O 密集型的爬虫场景(网络请求等待占比超 90%)中,线程切换能有效利用等待时间,提升整体效率。
1.2 多进程爬虫
进程是资源分配的基本单位,多进程通过multiprocessing模块创建独立的 Python 解释器进程,每个进程拥有独立的 GIL,可充分利用多核 CPU 资源。进程间通过管道、队列等机制通信,开销高于线程,但能突破 GIL 限制,适合 CPU 与 I/O 混合密集型的爬虫场景(如爬取后需即时解析数据)。
1.3 异步协程爬虫
异步协程基于事件循环(Event Loop)实现,通过asyncio+aiohttp组合,在单线程内通过非阻塞 I/O 调度任务。协程的切换由程序自身控制(用户态),无需操作系统内核参与,切换开销远低于线程 / 进程,是纯 I/O 密集型爬虫的最优解。
二、统一测试环境与基准
为保证对比的公平性,所有测试基于以下统一环境:
● 硬件:Intel i7-12700H(14 核 20 线程)、16GB 内存、千兆网络
● 软件:Python 3.10、requests 2.31.0、aiohttp 3.9.1
● 测试目标:抓取某公开 API 接口(https://httpbin.org/get)1000 次,记录总耗时
● 控制变量:统一设置超时时间 5 秒,禁用缓存,单次请求逻辑完全一致
三、代码实现与解析
3.1 基础同步爬虫(基准对照)
python
运行
import requests
import time
def fetch(url):
"""单次请求函数"""
try:
response = requests.get(url, timeout=5)
return response.status_code
except Exception as e:
return str(e)
def sync_crawl(url, times):
"""同步爬虫主函数"""
starttime = time.time()
results = []
for in range(times):
results.append(fetch(url))
end_time = time.time()
total_time = end_time - start_time
print(f"同步爬虫完成{times}次请求,总耗时:{total_time:.2f}秒")
return total_time, results
if name == "main":
TEST_URL = "https://httpbin.org/get"
REQUEST_TIMES = 1000
sync_crawl(TEST_URL, REQUEST_TIMES)
核心说明:同步爬虫按顺序执行每个请求,前一个请求完成后才开始下一个,是性能对比的基准线。
3.2 多线程爬虫实现
python
运行
import requests
import threading
import time
from queue import Queue
线程安全队列,用于存储结果
result_queue = Queue()
def thread_fetch(url):
"""线程执行的请求函数"""
try:
response = requests.get(url, timeout=5)
result_queue.put(response.status_code)
except Exception as e:
result_queue.put(str(e))
def thread_crawl(url, times, thread_num=10):
"""多线程爬虫主函数"""
start_time = time.time()
threads = []
# 创建并启动线程
for _ in range(thread_num):
t = threading.Thread(target=lambda: [thread_fetch(url) for _ in range(times//thread_num)])
t.start()
threads.append(t)
# 处理剩余请求(整除余数)
remaining = times % thread_num
if remaining > 0:
for _ in range(remaining):
thread_fetch(url)
# 等待所有线程完成
for t in threads:
t.join()
end_time = time.time()
total_time = end_time - start_time
results = [result_queue.get() for _ in range(times)]
print(f"多线程爬虫({thread_num}线程)完成{times}次请求,总耗时:{total_time:.2f}秒")
return total_time, results
if name == "main":
TEST_URL = "https://httpbin.org/get"
REQUEST_TIMES = 1000
thread_crawl(TEST_URL, REQUEST_TIMES, thread_num=10)
核心说明:
- 使用threading.Thread创建指定数量的线程,平均分配请求任务
- 通过Queue实现线程安全的结果存储,避免多线程数据竞争
- 线程数设置为 10(经验值),可根据 CPU 核心数调整
3.3 多进程爬虫实现
python
运行
import requests
import multiprocessing
import time
def process_fetch(url, result_list):
"""进程执行的请求函数"""
try:
response = requests.get(url, timeout=5)
result_list.append(response.status_code)
except Exception as e:
result_list.append(str(e))
def process_crawl(url, times, process_num=8):
"""多进程爬虫主函数"""
start_time = time.time()
manager = multiprocessing.Manager()
# 进程间共享列表
result_list = manager.list()
processes = []
# 分配任务并创建进程
tasks_per_process = times // process_num
remaining = times % process_num
for i in range(process_num):
task_count = tasks_per_process + (1 if i < remaining else 0)
p = multiprocessing.Process(
target=lambda: [process_fetch(url, result_list) for _ in range(task_count)]
)
p.start()
processes.append(p)
# 等待所有进程完成
for p in processes:
p.join()
end_time = time.time()
total_time = end_time - start_time
print(f"多进程爬虫({process_num}进程)完成{times}次请求,总耗时:{total_time:.2f}秒")
return total_time, list(result_list)
if name == "main":
TEST_URL = "https://httpbin.org/get"
REQUEST_TIMES = 1000
process_crawl(TEST_URL, REQUEST_TIMES, process_num=8)
核心说明:
- 使用multiprocessing.Manager()创建进程间共享列表,解决进程通信问题
- 进程数设置为 8(匹配 CPU 核心数),避免进程过多导致调度开销
- 进程创建 / 销毁开销高于线程,因此任务分配更注重均衡
3.4 异步协程爬虫实现
python
运行
import asyncio
import aiohttp
import time
async def async_fetch(session, url):
"""异步请求函数"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
return response.status
except Exception as e:
return str(e)
async def async_crawl(url, times):
"""异步爬虫主函数"""
start_time = time.time()
results = []
# 创建异步会话
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [async_fetch(session, url) for _ in range(times)]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
end_time = time.time()
total_time = end_time - start_time
print(f"异步协程爬虫完成{times}次请求,总耗时:{total_time:.2f}秒")
return total_time, results
if name == "main":
TEST_URL = "https://httpbin.org/get"
REQUEST_TIMES = 1000
# 运行异步主函数
total_time, results = asyncio.run(async_crawl(TEST_URL, REQUEST_TIMES))
核心说明:
- 使用aiohttp替代requests(异步 HTTP 客户端),避免阻塞
- asyncio.gather()批量执行协程任务,实现真正的非阻塞并发
- 无需手动设置协程数,事件循环自动调度,资源利用率更高
四、实测数据与分析
4.1 测试结果汇总
爬虫类型 1000 次请求总耗时(秒) 平均单次耗时(毫秒) CPU 利用率 内存占用
同步爬虫 285.6 285.6 8% 45MB
多线程爬虫(10 线程) 32.8 32.8 25% 68MB
多进程爬虫(8 进程) 28.5 28.5 75% 320MB
异步协程爬虫 15.2 15.2 30% 52MB
4.2 关键结论 - 性能排序:异步协程 > 多进程 > 多线程 > 同步
- 资源消耗:多进程内存占用最高(进程独立内存空间),异步协程资源效率最优
- CPU 利用率:多进程充分利用多核,异步协程次之,多线程受 GIL 限制利用率较低
4.3 场景适配分析
● 异步协程:适合纯 I/O 密集型爬虫(如仅抓取网页内容,无复杂解析),百万级请求场景首选
● 多进程:适合爬虫 + 数据解析(CPU 密集)混合场景,多核利用率最高
● 多线程:适合中小规模(1 万次以内)爬虫,实现简单、资源消耗适中
● 同步爬虫:仅适合调试或极小规模抓取,无并发优势
五、进阶优化建议 - 混合模式:异步协程 + 多进程结合,协程处理 I/O,进程处理解析,兼顾效率与多核利用
- 限流控频:添加请求延迟(如asyncio.sleep(0.1)),避免目标服务器封禁 IP(推荐亿牛云代理)
- 连接池:异步爬虫中使用TCPConnector设置连接池大小,提升复用率:
python
运行异步爬虫连接池优化示例
connector = aiohttp.TCPConnector(limit=100) # 最大并发连接数
async with aiohttp.ClientSession(connector=connector) as session:执行爬虫任务
- 异常重试:为关键请求添加重试机制,提升稳定性:
python
运行
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=5))
async def async_fetch(session, url):
# 原有请求逻辑
总结
- 性能核心:异步协程凭借用户态切换的低开销,在纯 I/O 爬虫场景中性能最优;多进程突破 GIL 限制,适合 CPU+I/O 混合场景;多线程则是中小规模场景的折中选择。
- 资源权衡:高并发性能往往伴随更高的资源消耗,需根据服务器配置和目标网站反爬策略选择合适的并发模式。
- 选型建议:百万级纯抓取任务优先异步协程;需即时解析的大规模任务选多进程;中小规模、快速实现的场景用多线程。