做数据采集的同行们,在爬虫进阶的路上肯定都遇到过这个瓶颈:当目标数据量从几百条飙升到十万级别,尤其是涉及图片、视频等多媒体文件时,普通的单线程下载不仅慢得让人怀疑人生,还极其容易触发目标服务器的反爬策略,导致IP被封。市面上的第三方下载工具要么限速,要么免费额度太少。
想要把稳定性握在自己手里,完全控制并发量和代理轮换,我们需要自己造轮子。今天咱们就从底层逻辑出发,用 Python 原生库写一个支持并发、且原生集成代理池的流媒体下载器。
多线程分段下载的底层逻辑
很多人觉得 Python 有 GIL(全局解释器锁),多线程是“伪多线程”。但在网络请求这种 I/O 密集型任务中,线程在等待网络响应时会释放 GIL,因此多线程并发下载是非常高效的。 多线程下载一个文件的核心流程其实就四步:- 发送 HEAD 请求获取目标文件的大小。
- 根据设置的线程数,把文件切分成多个区块。
- 利用 HTTP 的 Range 请求头,让不同线程去并发下载对应的区间数据。
- 所有分段下载完成后,在本地将临时文件按顺序拼接成完整文件。
爬虫代理的无缝接入
只要并发量一上去,单 IP 极其容易被目标服务器拉黑。想要稳定跑批量任务,利用代理IP池分散请求来源是唯一解。 优质代理的使用有几个黄金原则: 代理要轮换 、 要有失效检测 、并且 控制单IP的请求频率 。在下面的实战代码中,我们将直接接入爬虫代理。该产品提供了标准化的 HTTP/HTTPS 接口,我们可以将其封装进 requests 请求的 proxies 参数中。核心代码实战
下面是一段整合了分段下载与爬虫代理接入的核心代码示例,原生依赖 requests ,不需要花里胡哨的第三方框架。代码中我加了详尽的中文注释,大家可以直接复制去跑:import requests
import time
import os
from concurrent.futures import ThreadPoolExecutor
class ProxyDownloader:
def __init__(self, target_url, save_path, max_workers=5):
self.target_url = target_url
self.save_path = save_path
self.max_workers = max_workers
# 亿牛云代理配置信息 (需替换为你官网后台获取的真实信息)
self.proxy_domain = "proxy.16yun.cn" # 代理服务器域名或IP
self.proxy_port = "8100" # 代理服务器端口
self.proxy_user = "16YUNXXXXX" # 通行证账号
self.proxy_pass = "PASSXXXXX" # 通行证密码
def _get_proxies(self):
"""
构造requests所需的proxies字典,实现代理接入
"""
proxy_meta = f"http://{self.proxy_user}:{self.proxy_pass}@{self.proxy_domain}:{self.proxy_port}"
return {
"http": proxy_meta,
"https": proxy_meta
}
def _download_chunk(self, start, end, chunk_id):
"""
使用代理下载指定的文件区块
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
# 核心:通过Range请求头告诉服务器我们需要哪一段数据
"Range": f"bytes={start}-{end}"
}
chunk_filename = f"{self.save_path}.part{chunk_id}"
proxies = self._get_proxies()
# 指数退避重试机制,防止网络抖动导致的下载失败
for attempt in range(3):
try:
# 必须设置 timeout,避免无响应的请求一直挂起占用线程
resp = requests.get(
self.target_url,
headers=headers,
proxies=proxies,
timeout=15,
stream=True
)
resp.raise_for_status()
# 将分段数据写入本地临时文件
with open(chunk_filename, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"[成功] 分段 {chunk_id} 下载完成。")
return chunk_filename
except requests.RequestException as e:
print(f"[重试] 分段 {chunk_id} 第 {attempt + 1} 次下载失败: {e}")
# 失败后等待 2的attempt次方秒再重试 (1秒, 2秒, 4秒)
time.sleep(2 ** attempt)
print(f"[错误] 分段 {chunk_id} 下载彻底失败,已超过最大重试次数。")
return None
def run(self):
"""
主调度器:获取文件大小 -> 划分区块 -> 多线程并发下载 -> 合并文件
"""
print("开始解析资源...")
proxies = self._get_proxies()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
# 1. 获取文件大小
try:
resp = requests.head(self.target_url, headers=headers, proxies=proxies, timeout=10, allow_redirects=True)
resp.raise_for_status()
except requests.RequestException:
print("HEAD请求失败,无法获取文件信息。")
return
content_length = resp.headers.get("Content-Length")
# 检查服务器是否支持分段下载 (Accept-Ranges: bytes)
if not content_length or resp.headers.get("Accept-Ranges") != "bytes":
print("服务器不支持 Range 分段下载,多线程策略失效,需退化为单线程。")
return
file_size = int(content_length)
print(f"目标文件总大小: {file_size / 1024 / 1024:.2f} MB")
# 2. 计算每个线程负责的区块大小
chunk_size = file_size // self.max_workers
futures = []
temp_files = []
# 3. 启动线程池进行并发分段下载
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for i in range(self.max_workers):
start = i * chunk_size
# 最后一个区块必须兜底处理剩余的所有字节
end = (i + 1) * chunk_size - 1 if i < self.max_workers - 1 else file_size - 1
futures.append(executor.submit(self._download_chunk, start, end, i))
# 4. 收集成功下载的临时文件路径
for future in futures:
result = future.result()
if result:
temp_files.append(result)
# 5. 合并分段文件并清理环境
if len(temp_files) == self.max_workers:
print("所有分段下载完成,正在合并...")
# 必须按照分段ID排序,保证文件拼接顺序正确
temp_files.sort(key=lambda x: int(x.split('.part')[-1]))
with open(self.save_path, "wb") as outfile:
for chunk_path in temp_files:
with open(chunk_path, "rb") as infile:
outfile.write(infile.read())
# 合并完成后删除碎片文件
os.remove(chunk_path)
print(f"下载大功告成!文件保存在: {self.save_path}")
else:
print("部分分段下载失败,合并终止。")
if __name__ == "__main__":
# 测试用例:下载一个示例流媒体文件
url_to_download = "https://example.com/sample_video.mp4"
save_location = "./sample_video.mp4"
# 实例化并运行下载器
downloader = ProxyDownloader(target_url=url_to_download, save_path=save_location, max_workers=5)
downloader.run()
避坑指南:线上实战的 3 个细节
代码写完了,但距离工业级跑批量任务还有几个坑需要注意:- 切忌无脑死等:网络请求必须加上 timeout 参数,建议设置在 15-30 秒左右。没有超时控制的话,服务器响应慢或网络抖动会导致你的连接一直挂着,直到永远。
- 重试要讲策略:遇到报错千万别用 while True 马上猛烈重试。代码里演示了“指数退避”策略——每次失败后等待的时间呈 2 的次方倍增加。这能有效避免在服务器拥堵时你还继续给对方施压。
- 不要滥用代理:拿到了高匿名代理 IP 也要配合请求频率控制,建议单个 IP 每分钟请求不超过 60 次,并且要定期刷新代理池剔除失效节点。