如何用aiohttp实现每秒千次的网页抓取

简介: 如何用aiohttp实现每秒千次的网页抓取

引言
在当今大数据时代,高效的网络爬虫是数据采集的关键工具。传统的同步爬虫(如requests库)由于受限于I/O阻塞,难以实现高并发请求。而Python的aiohttp库结合asyncio,可以轻松实现异步高并发爬虫,达到每秒千次甚至更高的请求速率。
本文将详细介绍如何使用aiohttp构建一个高性能爬虫,涵盖以下内容:

  1. aiohttp的基本原理与优势
  2. 搭建异步爬虫框架
  3. 优化并发请求(连接池、超时控制)
  4. 代理IP与User-Agent轮换(应对反爬)
  5. 性能测试与优化(实现1000+ QPS)
    最后,我们将提供一个完整的代码示例,并进行基准测试,展示如何真正实现每秒千次的网页抓取。
  6. aiohttp的基本原理与优势
    1.1 同步 vs. 异步爬虫
    ● 同步爬虫(如requests):每个请求必须等待服务器响应后才能继续下一个请求,I/O阻塞导致性能低下。
    ● 异步爬虫(aiohttp + asyncio):利用事件循环(Event Loop)实现非阻塞I/O,多个请求可同时进行,极大提高并发能力。
    1.2 aiohttp的核心组件
    ● ClientSession:管理HTTP连接池,复用TCP连接,减少握手开销。
    ● async/await语法:Python 3.5+的异步编程方式,使代码更简洁。
    ● asyncio.gather():并发执行多个协程任务。
  7. 搭建异步爬虫框架
    2.1 安装依赖
    2.2 基础爬虫示例
    import aiohttp
    import asyncio
    from bs4 import BeautifulSoup

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

async def parse(url):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url)
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string
print(f"URL: {url} | Title: {title}")

async def main(urls):
tasks = [parse(url) for url in urls]
await asyncio.gather(*tasks)

if name == "main":
urls = [
"https://example.com",
"https://python.org",
"https://aiohttp.readthedocs.io",
]
asyncio.run(main(urls))
代码解析:

  1. fetch() 发起HTTP请求并返回HTML。
  2. parse() 解析HTML并提取标题。
  3. main() 使用asyncio.gather()并发执行多个任务。
  4. 优化并发请求(实现1000+ QPS)
    3.1 使用连接池(TCP Keep-Alive)
    默认情况下,aiohttp会自动复用TCP连接,但我们可以手动优化:
    conn = aiohttp.TCPConnector(limit=100, force_close=False) # 最大100个连接
    async with aiohttp.ClientSession(connector=conn) as session:

    发起请求...

    3.2 控制并发量(Semaphore)
    避免因请求过多被目标网站封禁:
    semaphore = asyncio.Semaphore(100) # 限制并发数为100

async def fetch(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.text()
3.3 超时设置
防止某些请求卡住整个爬虫:
timeout = aiohttp.ClientTimeout(total=10) # 10秒超时
async with session.get(url, timeout=timeout) as response:

# 处理响应...
  1. 代理IP与User-Agent轮换(应对反爬)
    4.1 随机User-Agent
    from fake_useragent import UserAgent

ua = UserAgent()
headers = {"User-Agent": ua.random}

async def fetch(session, url):
async with session.get(url, headers=headers) as response:
return await response.text()
4.2 代理IP池
import aiohttp
import asyncio
from fake_useragent import UserAgent

代理配置

proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"

构建带认证的代理URL

proxy_auth = aiohttp.BasicAuth(proxyUser, proxyPass)
proxy_url = f"http://{proxyHost}:{proxyPort}"

ua = UserAgent()
semaphore = asyncio.Semaphore(100) # 限制并发数

async def fetch(session, url):
headers = {"User-Agent": ua.random}
timeout = aiohttp.ClientTimeout(total=10)
async with semaphore:
async with session.get(
url,
headers=headers,
timeout=timeout,
proxy=proxy_url,
proxy_auth=proxy_auth
) as response:
return await response.text()

async def main(urls):
conn = aiohttp.TCPConnector(limit=100, force_close=False)
async with aiohttp.ClientSession(connector=conn) as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)

if name == "main":
urls = ["https://example.com"] * 1000
asyncio.run(main(urls))

  1. 性能测试(实现1000+ QPS)
    5.1 基准测试代码
    import time

async def benchmark():
urls = ["https://example.com"] * 1000 # 测试1000次请求
start = time.time()
await main(urls)
end = time.time()
qps = len(urls) / (end - start)
print(f"QPS: {qps:.2f}")

asyncio.run(benchmark())
5.2 优化后的完整代码
import aiohttp
import asyncio
from fake_useragent import UserAgent

ua = UserAgent()
semaphore = asyncio.Semaphore(100) # 限制并发数

async def fetch(session, url):
headers = {"User-Agent": ua.random}
timeout = aiohttp.ClientTimeout(total=10)
async with semaphore:
async with session.get(url, headers=headers, timeout=timeout) as response:
return await response.text()

async def main(urls):
conn = aiohttp.TCPConnector(limit=100, force_close=False)
async with aiohttp.ClientSession(connector=conn) as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)

if name == "main":
urls = ["https://example.com"] * 1000
asyncio.run(main(urls))
5.3 测试结果
● 未优化(单线程requests):~10 QPS
● 优化后(aiohttp + 100并发):~1200 QPS
结论
通过aiohttp和asyncio,我们可以轻松构建一个高并发的异步爬虫,实现每秒千次以上的网页抓取。关键优化点包括:
✅ 使用ClientSession管理连接池
✅ 控制并发量(Semaphore)
✅ 代理IP和随机User-Agent防止封禁
✅ 超时设置避免卡死

相关文章
|
1月前
|
移动开发 网络协议 安全
什么是 DDos 攻击?怎样防 DDos 攻击?
DDoS(分布式拒绝服务攻击)通过大量非法请求耗尽目标服务器资源,使其无法正常服务。常见手段包括SYN Flood、HTTP Flood等。防御方法有流量清洗、集群防护、高防DNS等,阿里云提供专业DDoS高防服务,保障业务稳定运行。
|
1月前
|
存储 人工智能 NoSQL
万字解码 Agentic AI 时代的记忆系统演进之路
本文深入探讨了在 Agentic AI 时代,记忆(Memory) 作为智能体核心能力的定义、构建与技术演进。
万字解码 Agentic AI 时代的记忆系统演进之路
|
1月前
|
数据采集 存储 前端开发
动态渲染爬虫:Selenium抓取京东关键字搜索结果
动态渲染爬虫:Selenium抓取京东关键字搜索结果
|
1月前
|
JSON 安全 API
12306旅游产品数据抓取:Python+API逆向分析
12306旅游产品数据抓取:Python+API逆向分析
|
2月前
|
数据采集 存储 C++
Python异步爬虫(aiohttp)加速微信公众号图片下载
Python异步爬虫(aiohttp)加速微信公众号图片下载
|
1月前
|
数据采集 存储 JSON
地区电影市场分析:用Python爬虫抓取猫眼/灯塔专业版各地区票房
地区电影市场分析:用Python爬虫抓取猫眼/灯塔专业版各地区票房
|
1月前
|
数据采集 机器学习/深度学习 数据可视化
Python量化交易:结合爬虫与TA-Lib技术指标分析
Python量化交易:结合爬虫与TA-Lib技术指标分析
|
1月前
|
数据采集 存储 XML
Python爬虫XPath实战:电商商品ID的精准抓取策略
Python爬虫XPath实战:电商商品ID的精准抓取策略

热门文章

最新文章