只要你刚接触异步程序,就一定听过一句话:“协程越多越快”。
很遗憾,这句话通常只有前半句是真的。等你真正跑起来,就会发现一个非常扎心的现实:
协程开得越多,请求反而越慢,CPU 占用高得离谱,代理还频繁报错。
这篇文章就带你把这个现象讲清楚,再用一个真正完整的异步抓取实例帮你走通:
- 使用代理(以爬虫代理格式为参考)
- 抓学术论文的数据
- 提取标题、作者、机构、摘要、DOI、期刊等字段
- 数据存到 SQLite
- 最后还会告诉你:到底该怎么合理设置并发量
- 并提前告诉你初学者最容易踩的坑
一、我们今天要解决什么?
目标非常明确:
- 写一个真正可跑的异步学术抓取
- 使用爬虫代理 IP
- 从 DOAJ 抓取开放的论文元数据
- 提取核心字段(标题、作者、机构、摘要、关键词、DOI、年份、期刊、卷期、页码、学科分类、引用数)
- 存到 SQLite
- 最后解释协程越多越慢的原因
- 并准备一整套“读者易错点”的提示
二、动手之前的基础准备
你不需要太多背景知识,只需要:
- 基本 Python 语法
- 理解 async/await 是怎么一回事
- 对论文的一些字段有基本概念
够了,我们可以继续。
三、先说清楚:协程多了为什么会慢?
这里我想用最能让人听懂的方式来说清楚。
1. 事件循环需要调度协程,它不是无限扩容
协程全部跑在一个事件循环里。
如果你一下子扔进去 500、1000 个协程,就相当于让一个老师同时盯住 1000 名学生。
最后的结果就是:老师光切换注意力就累死了,根本没空讲课。
协程数量超过某个点,调度成本高到吓人,自然越来越慢。
2. 网络本身就是瓶颈,你协程再多也没用
很多人以为“我 await 了,所以网络比光速还快”。
现实是:
- 目标网站有响应速度限制
- 代理有带宽和并发限制
- 本地网络也有瓶颈
你开的协程越多,排队的人就越多,但网络不会因此自动提速。
3. 代理并发有限,你超出它的承载就只是在浪费时间
以亿牛云代理为例,不论你买的是动态还是固定套餐,都会有这样一些隐性或显性限制:
- 最大并发连接数
- 每秒允许的请求量(QPS)
- 超过阈值就返回错误或断开
你一口气开 300 个协程,代理可能只允许你同时连 20 条,其余的全部在排队浪费时间。
4. Python 的协程切换也要成本
协程切换虽然比线程轻量,但绝不是“零成本”。
当你开超过合理数量后,Python 自己忙于切换任务,效率下降得很明显。
换句话说:
不是协程越多越快,而是到一个合理数量后,多开的都是累赘。
四、正式上手:写一个能稳定工作的异步学术抓取
我们以 DOAJ 作为示例,这是开放的接口,非常适合学习。
第一步:安装依赖
pip install aiohttp aiosqlite lxml
第二步:准备代理
标准格式如下(参考爬虫代理):
http://用户名:密码@域名:端口
在代码中我们这样配置:
PROXY_HOST = "proxy.example.com"
PROXY_PORT = "12345"
PROXY_USER = "your_user"
PROXY_PASS = "your_pass"
如果你的用户名或密码里有特殊符号,一定要 URL 编码。
第三步:准备 SQLite 数据结构
不用手动建表,代码中会自动创建。
字段包含标题、作者、机构、DOI、摘要、期刊、卷期、页码等。
五、完整代码
下面是完整脚本,复制即可运行:
import aiohttp
import asyncio
import aiosqlite
import json
# -----------------------------
# 爬虫代理配置(参考16YUN)
# -----------------------------
PROXY_HOST = "proxy.16yun.com"
PROXY_PORT = "12345"
PROXY_USER = "your_user"
PROXY_PASS = "your_pass"
PROXY_URL = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
# DOAJ API 示例
API_URL = "https://doaj.org/api/v2/search/articles?page={}"
# -----------------------------
# 控制并发数:重点!协程不是越多越好!
# -----------------------------
MAX_CONCURRENCY = 10
sem = asyncio.Semaphore(MAX_CONCURRENCY)
def parse_paper(data):
"""从 DOAJ 返回中解析文章字段"""
bib = data["bibjson"]
return {
"title": bib.get("title"),
"authors": ", ".join(a["name"] for a in bib.get("author", [])),
"affiliation": ", ".join(a.get("affiliation", "") for a in bib.get("author", [])),
"email": ", ".join(a.get("email", "") for a in bib.get("author", [])),
"abstract": bib.get("abstract", ""),
"keywords": ", ".join(bib.get("keywords", [])),
"doi": bib.get("identifier", [{
}])[0].get("id", ""),
"pub_date": bib.get("year", ""),
"journal": bib.get("journal", {
}).get("title", ""),
"volume": bib.get("journal", {
}).get("volume", ""),
"issue": bib.get("journal", {
}).get("number", ""),
"pages": bib.get("pages", ""),
"category": ", ".join(bib.get("subject", [])),
"citations": bib.get("citation", 0),
}
async def fetch_page(session, page):
"""抓取某一页数据,带代理与限流"""
url = API_URL.format(page)
async with sem: # 限制并发,防止协程过载
try:
async with session.get(url, proxy=PROXY_URL, timeout=15) as resp:
data = await resp.json()
return data
except Exception as e:
print(f"第 {page} 页出错:{e}")
return None
async def save_to_db(db, paper):
"""存入 SQLite"""
await db.execute("""
INSERT INTO papers
(title, authors, affiliation, email, abstract, keywords, doi,
pub_date, journal, volume, issue, pages, category, citations)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
paper["title"], paper["authors"], paper["affiliation"], paper["email"],
paper["abstract"], paper["keywords"], paper["doi"], paper["pub_date"],
paper["journal"], paper["volume"], paper["issue"], paper["pages"],
paper["category"], paper["citations"]
))
async def main(total_pages=5):
async with aiohttp.ClientSession() as session, \
aiosqlite.connect("papers.db") as db:
# 自动建表
await db.execute("""
CREATE TABLE IF NOT EXISTS papers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
authors TEXT,
affiliation TEXT,
email TEXT,
abstract TEXT,
keywords TEXT,
doi TEXT,
pub_date TEXT,
journal TEXT,
volume TEXT,
issue TEXT,
pages TEXT,
category TEXT,
citations INTEGER
)
""")
tasks = [fetch_page(session, p) for p in range(1, total_pages + 1)]
results = await asyncio.gather(*tasks)
for res in results:
if not res:
continue
for item in res.get("results", []):
paper = parse_paper(item)
await save_to_db(db, paper)
await db.commit()
print("抓取结束!")
if __name__ == "__main__":
asyncio.run(main())
六、陷阱警告:这些地方是新人最容易犯错的
这部分非常重要,每个点都是真实踩坑后总结出来的。
陷阱 1:协程数量越多越快?现实完全相反
很多人写程序时会把并发设置成 200、300,甚至 1000。
但这样不仅不会更快,反而会让 CPU、代理、目标网站都跟你翻脸。
真正的经验是:低并发稳定、持续的吞吐量,远比高并发炸机更有效。
陷阱 2:代理本身就有限流,你越开越慢只是正常表现
代理服务商会限制你允许的最大连接数。
当并发超过代理的承载量时,多余的协程根本不会提升速度,只是在原地等待。
陷阱 3:异步不适合处理 CPU 密集任务
如果你在协程里跑长时间的 CPU 循环,整个事件循环都会被卡住。
异步适合 I/O 密集型任务,CPU 重活要让给线程池或进程池。
陷阱 4:缺少错误处理会让整个程序直接崩掉
没有超时处理、没有异常捕获,是新手抓取程序最常见的问题。
只要一个请求出错,没有保护的 await 可能导致整个任务失败。
七、常见问题的重点讲解
下面我把抓取程序运行中常见的四种现象拆开来说:
第一种:超时异常特别多。
通常是并发过高、代理负载过重,或者目标网站开启了限速策略。
解决方法就是降低协程数量,并加入合理的限流和重试。
第二种:CPU 占用突然飙到 100%。
很可能是你开了太多协程,事件循环忙到无法处理真正的任务。
降低并发即可缓解。
第三种:SQLite 写入速度慢得令人发指。
单连接串行写入会让速度大幅下降,你可以选择批量写入或者队列写入提升效率。
第四种:服务器频繁返回 429(Too Many Requests)。
说明你触发了对方的请求频率限制,需要加上 sleep、限速,或者进一步优化代理。