协程+连接池:高并发Python爬虫的底层优化逻辑

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 协程+连接池:高并发Python爬虫的底层优化逻辑

一、性能瓶颈的根源:同步阻塞I/O与TCP握手
在优化之前,必须理解传统同步爬虫为何缓慢。

  1. 同步阻塞I/O(Synchronous Blocking I/O):使用requests.get()时,程序会发起一个HTTP请求,然后线程会一直等待,直到远端服务器返回响应。在这个等待过程中,CPU大部分时间是空闲的,造成了巨大的资源浪费。这就像只有一个收银员的超市,每个顾客都必须等到前一个顾客完成全部结账流程后才能开始,效率极低。
  2. 昂贵的TCP连接建立:HTTP基于TCP协议。每次requests.get()都会经历一次TCP三次握手的过程。在高并发场景下,频繁地创建和销毁连接会产生巨大的开销,成为主要的性能瓶颈之一。
    为了解决这两个问题,我们的武器库里有两大法宝:协程解决I/O等待问题,连接池解决TCP连接复用问题。
    二、核心武器一:协程(Coroutine)—— I/O等待的“调度艺术”
    协程,又称微线程,是一种用户态的轻量级线程。其核心优势在于由用户自行控制调度,在I/O操作时主动让出(yield)CPU,而不是被操作系统强制挂起。
    底层逻辑:事件循环(Event Loop)与异步I/O
  3. 事件循环(The Event Loop):这是asyncio的核心。它是一个无限循环,负责监听和管理所有的事件和任务。你可以把它想象成一个极其高效的项目经理。
  4. 任务(Tasks):每一个异步函数(async def)都会被包装成一个Task。
  5. 可等待对象(Awaitables):当任务执行到await语句(通常是I/O操作,如网络请求、读写文件)时,会发生以下神奇的事情:
    ○ 该任务会立即告知事件循环:“我要进行I/O操作了,这会很慢,别等我,你先去处理其他准备好了的任务吧。”
    ○ 事件循环于是暂停(挂起)当前任务,转而执行其他已经准备好继续运行的任务。
    ○ 当底层的操作系统完成I/O操作(如收到服务器响应)后,事件循环会收到通知,并在适当的时机恢复执行刚才被挂起的任务,从await之后的地方继续运行。
    这个过程是单线程的,通过在I/O等待期间切换任务,极大地提高了CPU的利用率,从而在单位时间内可以发起成千上万个网络请求。
    简单比喻:同步阻塞是单线流水线,一个环节卡住整条线停止。协程是多线流水线,一个环节(I/O)卡住,工人(CPU)立刻去处理其他流水线上的工作,从而保证工人永远在忙碌。
    三、核心武器二:连接池(Connection Pool)—— TCP连接的“资源管家”
    连接池是另一个被严重低估的底层优化。它的核心思想是:复用,而不是重建。
    底层逻辑:TCP连接复用
    一个httpx.AsyncClient或aiohttp.ClientSession对象内部默认维护着一个连接池。
  6. 当你的爬虫发起第一个请求时:客户端会与目标服务器建立一条TCP连接(经历三次握手)。
  7. 请求完成后:这条连接不会立即关闭,而是被放入一个名为“连接池”的容器中,并标记为空闲状态。
  8. 当你的爬虫发起下一个请求(至同一主机)时:客户端不会创建新的TCP连接,而是直接从连接池中取出这条空闲的、已经建立好的连接来发送新的HTTP请求。
    这样做带来了两大核心好处:
    ● 极大降低延迟:避免了每次请求都进行TCP三次握手和SSL握手(对于HTTPS)的开销,请求响应速度更快。
    ● 减轻系统负担:大幅减少了操作系统因频繁创建和销毁socket端口所带来的资源消耗。
    没有连接池:10个请求 => 10次TCP握手 => 10个socket。
    有连接池:10个请求 => 1次TCP握手 => 复用1个socket => 性能提升一个数量级。
    四、实战:构建基于协程与连接池的高并发爬虫
    下面我们使用httpx库(同时支持HTTP/1.1和HTTP/2,API更现代)来演示如何正确利用这两大武器。
  9. 错误示范:没有连接池的异步爬虫
    ```import asyncio
    import httpx
    import time

async def fetch_no_pool(url):
"""错误示范:每次请求都创建新的连接,无法复用TCP连接"""
async with httpx.AsyncClient() as client: # 每次都创建新的Client对象
response = await client.get(url)
return response.text[:200] # 返回部分内容

async def main_no_pool():
url = "https://httpbin.org/get"
tasks = [fetch_nopool(url) for in range(10)]
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"无连接池模式 耗时: {end_time - start_time:.2f} 秒")

# for result in results:
#     print(result)

asyncio.run(main_no_pool())

输出可能: 无连接池模式 耗时: 1.85 秒
问题分析: 虽然用了协程并发,但每个任务都创建独立的AsyncClient,导致TCP连接无法复用,性能依然低下。
2. 正确示范:协程 + 连接池的最佳实践
```import asyncio
import httpx
import time

async def fetch_with_pool(client, url):
    """正确示范:复用同一个Client及其连接池"""
    response = await client.get(url)
    return response.text[:200]

async def main_with_pool():
    url = "https://httpbin.org/get"
    # 关键步骤:在整个爬虫生命周期内,共享同一个AsyncClient实例
    async with httpx.AsyncClient(
        limits=httpx.Limits(max_keepalive_connections=10, keepalive_expiry=30),
        timeout=httpx.Timeout(10.0)
    ) as client:
        tasks = [fetch_with_pool(client, url) for _ in range(10)]
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        print(f"协程+连接池模式 耗时: {end_time - start_time:.2f} 秒")
        # for result in results:
        #     print(result)

# asyncio.run(main_with_pool())

输出可能: 协程+连接池模式 耗时: 0.45 秒
性能对比: 正确的方法比错误的方法快了近4倍!这其中的巨大差异,主要就来源于连接池避免的TCP握手开销。

  1. 高级优化:精细化配置连接池与重试机制
    一个生产级的爬虫还需要考虑限流、重试和代理。
    ```from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
    import httpx
    import asyncio

代理配置信息

proxyHost = "www.16yun.cn"
proxyPort = "5445"
proxyUser = "16QMSOML"
proxyPass = "280651"

class HighConcurrencyCrawler:
def init(self, concurrency=10, use_proxy=True):

    # 精细化配置连接池参数
    self.limits = httpx.Limits(
        max_connections=concurrency, # 最大连接数
        max_keepalive_connections=concurrency, # 最大保持活跃的连接数
        keepalive_expiry=10 # 活跃连接保持时间(秒)
    )
    self.timeout = httpx.Timeout(10.0)
    self.client = None
    self.use_proxy = use_proxy
    # 构造代理URL(多种格式)
    self.proxy_url = f"http://{proxyUser}:{proxyPass}@{proxyHost}:{proxyPort}"
    self.proxies = {
        "http://": self.proxy_url,
        "https://": self.proxy_url,
    }

async def __aenter__(self):
    # 根据是否使用代理来初始化客户端
    if self.use_proxy:
        self.client = httpx.AsyncClient(
            limits=self.limits,
            timeout=self.timeout,
            proxies=self.proxies  # 方式一:使用代理字典
            # 或者使用以下方式:
            # proxies=self.proxy_url  # 方式二:直接使用代理URL字符串
        )
    else:
        self.client = httpx.AsyncClient(
            limits=self.limits,
            timeout=self.timeout
        )
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
    await self.client.aclose()

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception((httpx.NetworkError, httpx.HTTPStatusError))
)
async def fetch_url(self, url):
    try:
        # 方式三:也可以在每次请求时单独设置代理(更灵活)
        # proxies = self.proxies if self.use_proxy else None
        # resp = await self.client.get(url, proxies=proxies)

        resp = await self.client.get(url)
        resp.raise_for_status()
        return resp.text
    except httpx.ProxyError as e:
        print(f"代理连接错误: {e}")
        raise
    except Exception as e:
        print(f"Request failed for {url}: {e}")
        raise

async def crawl(self, urls):
    tasks = [self.fetch_url(url) for url in urls]
    return await asyncio.gather(*tasks, return_exceptions=True)

使用代理的示例

async def main_with_proxy():
urls = ["https://httpbin.org/ip"] * 5 # 使用这个URL可以查看当前使用的IP
async with HighConcurrencyCrawler(concurrency=5, use_proxy=True) as crawler:
results = await crawler.crawl(urls)

# 输出结果查看代理是否生效
for i, result in enumerate(results):
    if not isinstance(result, Exception):
        print(f"结果 {i+1}: {result}")
    else:
        print(f"请求 {i+1} 失败: {result}")

不使用代理的示例(用于对比)

async def main_without_proxy():
urls = ["https://httpbin.org/ip"] * 3
async with HighConcurrencyCrawler(concurrency=3, use_proxy=False) as crawler:
results = await crawler.crawl(urls)

for i, result in enumerate(results):
    if not isinstance(result, Exception):
        print(f"直连结果 {i+1}: {result}")
    else:
        print(f"直连请求 {i+1} 失败: {result}")

更灵活的代理使用方式:轮询多个代理

class ProxyRotatorCrawler(HighConcurrencyCrawler):
def init(self, concurrency=10, proxy_list=None):
super().init(concurrency, use_proxy=True)
self.proxy_list = proxy_list or [self.proxy_url]
self.current_proxy_index = 0

def get_next_proxy(self):
    """轮询获取下一个代理"""
    proxy = self.proxy_list[self.current_proxy_index]
    self.current_proxy_index = (self.current_proxy_index + 1) % len(self.proxy_list)
    return proxy

async def fetch_url(self, url):
    try:
        # 每次请求使用不同的代理
        current_proxy = self.get_next_proxy()
        resp = await self.client.get(url, proxies=current_proxy)
        resp.raise_for_status()
        return resp.text
    except Exception as e:
        print(f"Request failed for {url} with proxy {current_proxy}: {e}")
        raise

if name == "main":

# 运行带代理的爬虫
print("=== 使用代理访问 ===")
asyncio.run(main_with_proxy())

print("\n=== 直连访问 ===")
asyncio.run(main_without_proxy())

```
五、总结:1+1>2的优化哲学
通过深度剖析,我们可以看到:
● 协程 是“大脑”,通过事件循环和任务调度,解决了CPU因I/O等待而空闲的问题,实现了高并发。
● 连接池 是“心脏”,通过TCP连接的复用,解决了频繁握手带来的巨大开销问题,实现了高性能。
二者并非孤立存在,而是相辅相成、缺一不可的有机整体。只使用协程而忽视连接池,爬虫的性能天花板会非常低;只复用连接而采用同步阻塞模式,则无法应对高并发场景。

相关文章
|
27天前
|
数据采集 Web App开发 数据安全/隐私保护
实战:Python爬虫如何模拟登录与维持会话状态
实战:Python爬虫如何模拟登录与维持会话状态
|
2月前
|
数据采集 Web App开发 自然语言处理
新闻热点一目了然:Python爬虫数据可视化
新闻热点一目了然:Python爬虫数据可视化
|
1月前
|
数据采集 监控 数据库
Python异步编程实战:爬虫案例
🌟 蒋星熠Jaxonic,代码为舟的星际旅人。从回调地狱到async/await协程天堂,亲历Python异步编程演进。分享高性能爬虫、数据库异步操作、限流监控等实战经验,助你驾驭并发,在二进制星河中谱写极客诗篇。
Python异步编程实战:爬虫案例
|
2月前
|
数据采集 存储 XML
Python爬虫技术:从基础到实战的完整教程
最后强调: 父母法律法规限制下进行网络抓取活动; 不得侵犯他人版权隐私利益; 同时也要注意个人安全防止泄露敏感信息.
644 19
|
1月前
|
数据采集 存储 JSON
Python爬虫常见陷阱:Ajax动态生成内容的URL去重与数据拼接
Python爬虫常见陷阱:Ajax动态生成内容的URL去重与数据拼接
|
1月前
|
数据采集 存储 JavaScript
解析Python爬虫中的Cookies和Session管理
Cookies与Session是Python爬虫中实现状态保持的核心。Cookies由服务器发送、客户端存储,用于标识用户;Session则通过唯一ID在服务端记录会话信息。二者协同实现登录模拟与数据持久化。
|
2月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
2月前
|
数据采集 Web App开发 前端开发
处理动态Token:Python爬虫应对AJAX授权请求的策略
处理动态Token:Python爬虫应对AJAX授权请求的策略
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
949 0
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
452 1

推荐镜像

更多