手把手带你用Python撸一个多线程+代理池下载器

简介: 这篇文章介绍了如何使用Python标准库开发一个支持并发和代理池管理的流媒体下载器,以提高大规模数据采集效率。文章提出了基于多线程分段下载的解决方案,并强调了高并发环境下代理IP池的重要性。主要方法包括:构建代理字典、下载文件块和多线程下载合并。实战技巧包括设置超时、指数退避重试和合理使用代理IP,以平衡并发速度和稳定性。
做数据采集的同行们,在爬虫进阶的路上肯定都遇到过这个瓶颈:当目标数据量从几百条飙升到十万级别,尤其是涉及图片、视频等多媒体文件时,普通的单线程下载不仅慢得让人怀疑人生,还极其容易触发目标服务器的反爬策略,导致IP被封。市面上的第三方下载工具要么限速,要么免费额度太少。 想要把稳定性握在自己手里,完全控制并发量和代理轮换,我们需要自己造轮子。今天咱们就从底层逻辑出发,用 Python 原生库写一个支持并发、且原生集成代理池的流媒体下载器。

多线程分段下载的底层逻辑

很多人觉得 Python 有 GIL(全局解释器锁),多线程是“伪多线程”。但在网络请求这种 I/O 密集型任务中,线程在等待网络响应时会释放 GIL,因此多线程并发下载是非常高效的。 多线程下载一个文件的核心流程其实就四步:
  1. 发送 HEAD 请求获取目标文件的大小。
  2. 根据设置的线程数,把文件切分成多个区块。
  3. 利用 HTTP 的 Range 请求头,让不同线程去并发下载对应的区间数据。
  4. 所有分段下载完成后,在本地将临时文件按顺序拼接成完整文件。

爬虫代理的无缝接入

只要并发量一上去,单 IP 极其容易被目标服务器拉黑。想要稳定跑批量任务,利用代理IP池分散请求来源是唯一解。 优质代理的使用有几个黄金原则: 代理要轮换 要有失效检测 、并且 控制单IP的请求频率 。在下面的实战代码中,我们将直接接入爬虫代理。该产品提供了标准化的 HTTP/HTTPS 接口,我们可以将其封装进 requests 请求的 proxies 参数中。

核心代码实战

下面是一段整合了分段下载与爬虫代理接入的核心代码示例,原生依赖 requests ,不需要花里胡哨的第三方框架。代码中我加了详尽的中文注释,大家可以直接复制去跑:
import requests
import time
import os
from concurrent.futures import ThreadPoolExecutor

class ProxyDownloader:
    def __init__(self, target_url, save_path, max_workers=5):
        self.target_url = target_url
        self.save_path = save_path
        self.max_workers = max_workers

        # 亿牛云代理配置信息 (需替换为你官网后台获取的真实信息)
        self.proxy_domain = "proxy.16yun.cn"  # 代理服务器域名或IP
        self.proxy_port = "8100"              # 代理服务器端口
        self.proxy_user = "16YUNXXXXX"        # 通行证账号
        self.proxy_pass = "PASSXXXXX"         # 通行证密码

    def _get_proxies(self):
        """
        构造requests所需的proxies字典,实现代理接入
        """
        proxy_meta = f"http://{self.proxy_user}:{self.proxy_pass}@{self.proxy_domain}:{self.proxy_port}"
        return {
   
            "http": proxy_meta,
            "https": proxy_meta
        }

    def _download_chunk(self, start, end, chunk_id):
        """
        使用代理下载指定的文件区块
        """
        headers = {
   
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            # 核心:通过Range请求头告诉服务器我们需要哪一段数据
            "Range": f"bytes={start}-{end}"
        }

        chunk_filename = f"{self.save_path}.part{chunk_id}"
        proxies = self._get_proxies()

        # 指数退避重试机制,防止网络抖动导致的下载失败
        for attempt in range(3):
            try:
                # 必须设置 timeout,避免无响应的请求一直挂起占用线程
                resp = requests.get(
                    self.target_url, 
                    headers=headers, 
                    proxies=proxies, 
                    timeout=15, 
                    stream=True
                )
                resp.raise_for_status()

                # 将分段数据写入本地临时文件
                with open(chunk_filename, "wb") as f:
                    for chunk in resp.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                print(f"[成功] 分段 {chunk_id} 下载完成。")
                return chunk_filename

            except requests.RequestException as e:
                print(f"[重试] 分段 {chunk_id} 第 {attempt + 1} 次下载失败: {e}")
                # 失败后等待 2的attempt次方秒再重试 (1秒, 2秒, 4秒)
                time.sleep(2 ** attempt)

        print(f"[错误] 分段 {chunk_id} 下载彻底失败,已超过最大重试次数。")
        return None

    def run(self):
        """
        主调度器:获取文件大小 -> 划分区块 -> 多线程并发下载 -> 合并文件
        """
        print("开始解析资源...")
        proxies = self._get_proxies()
        headers = {
   "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}

        # 1. 获取文件大小
        try:
            resp = requests.head(self.target_url, headers=headers, proxies=proxies, timeout=10, allow_redirects=True)
            resp.raise_for_status()
        except requests.RequestException:
            print("HEAD请求失败,无法获取文件信息。")
            return

        content_length = resp.headers.get("Content-Length")
        # 检查服务器是否支持分段下载 (Accept-Ranges: bytes)
        if not content_length or resp.headers.get("Accept-Ranges") != "bytes":
            print("服务器不支持 Range 分段下载,多线程策略失效,需退化为单线程。")
            return

        file_size = int(content_length)
        print(f"目标文件总大小: {file_size / 1024 / 1024:.2f} MB")

        # 2. 计算每个线程负责的区块大小
        chunk_size = file_size // self.max_workers
        futures = []
        temp_files = []

        # 3. 启动线程池进行并发分段下载
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            for i in range(self.max_workers):
                start = i * chunk_size
                # 最后一个区块必须兜底处理剩余的所有字节
                end = (i + 1) * chunk_size - 1 if i < self.max_workers - 1 else file_size - 1
                futures.append(executor.submit(self._download_chunk, start, end, i))

        # 4. 收集成功下载的临时文件路径
        for future in futures:
            result = future.result()
            if result:
                temp_files.append(result)

        # 5. 合并分段文件并清理环境
        if len(temp_files) == self.max_workers:
            print("所有分段下载完成,正在合并...")
            # 必须按照分段ID排序,保证文件拼接顺序正确
            temp_files.sort(key=lambda x: int(x.split('.part')[-1]))
            with open(self.save_path, "wb") as outfile:
                for chunk_path in temp_files:
                    with open(chunk_path, "rb") as infile:
                        outfile.write(infile.read())
                    # 合并完成后删除碎片文件
                    os.remove(chunk_path)
            print(f"下载大功告成!文件保存在: {self.save_path}")
        else:
            print("部分分段下载失败,合并终止。")

if __name__ == "__main__":
    # 测试用例:下载一个示例流媒体文件
    url_to_download = "https://example.com/sample_video.mp4" 
    save_location = "./sample_video.mp4"

    # 实例化并运行下载器
    downloader = ProxyDownloader(target_url=url_to_download, save_path=save_location, max_workers=5)
    downloader.run()

避坑指南:线上实战的 3 个细节

代码写完了,但距离工业级跑批量任务还有几个坑需要注意:
  • 切忌无脑死等:网络请求必须加上 timeout 参数,建议设置在 15-30 秒左右。没有超时控制的话,服务器响应慢或网络抖动会导致你的连接一直挂着,直到永远。
  • 重试要讲策略:遇到报错千万别用 while True 马上猛烈重试。代码里演示了“指数退避”策略——每次失败后等待的时间呈 2 的次方倍增加。这能有效避免在服务器拥堵时你还继续给对方施压。
  • 不要滥用代理:拿到了高匿名代理 IP 也要配合请求频率控制,建议单个 IP 每分钟请求不超过 60 次,并且要定期刷新代理池剔除失效节点。
掌握了这些细节,你的爬虫就能在并发速度和稳定性之间找到最佳的平衡点。各位同行如果在数据采集或代理池调度上还有什么玩法,欢迎交流。
相关文章
|
6天前
|
人工智能 JSON 自然语言处理
让教学更智慧:用阿里云百炼工作流,自动生成中小学教材内容#小有可为#有温度的AI
通过可视化工作流编排,将大模型推理能力转化为标准化的教学内容生成引擎。教师只需输入教材标题和适用学段,即可自动获得结构完整、符合课程标准的章节内容,大幅降低备课门槛,助力教育资源均衡化。
463 123
|
8天前
|
人工智能 定位技术 SEO
我学 GEO 第 15 天:终于知道AI GEO该如何做?
我是暴走的莉莉酱,边旅行边研究AI GEO的数字游民。专注普通人如何提升“AI可见度”——让AI在回答用户问题时准确识别、理解并推荐你。不讲玄学,只做可测、可调、可持续的GEO实践。
444 127
|
10天前
|
机器学习/深度学习 人工智能 调度
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
HappyHorse 1.1 是新一代视频生成大模型,全面升级动态表现力、角色一致性、指令遵循、视觉质感与音画协同能力。支持I2V/T2V/R2V三类生成,适配短剧、电商广告、品牌营销等场景,提供高质、流畅、可控的AI视频生产力。
758 5
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
|
2天前
|
消息中间件 存储 Kafka
Kafka 原生消息入湖能力上线!一键打通实时流与数据湖
阿里云消息队列 Kafka 版正式上线原生消息入湖能力。
216 121
|
2天前
|
人工智能 安全 Cloud Native
Higress 新发布:AI Gateway 能力增强,Gateway API 及其推理扩展持续打磨
增强 AI 网关能力,持续打磨 Gateway API 及其推理扩展。
263 122
|
8天前
|
缓存 人工智能 运维
阿里云618百炼大模型Qwen3.7-Max功能、免费试用、订阅计费、配置接入详解
Qwen3.7-MAX是阿里云百炼平台推出的通义千问3.7系列旗舰大语言模型,专为智能体时代复杂任务打造,依托阿里云全域算力与自研技术,在逻辑推理、长文本处理、代码工程、长周期自主执行等领域达到行业顶尖水平。2026年618期间,该模型推出多重免费试用权益、按量计费5折、订阅套餐优惠等专属福利,覆盖个人开发者、团队与企业全场景需求,以下从核心功能、免费试用、订阅计费、配置接入四方面展开详细解析。
453 123
|
6天前
|
人工智能 自然语言处理 API
阿里云Token Plan团队版解析:功能、三档套餐与省钱订阅指南
阿里云百炼平台推出的Token Plan团队版,是面向企业与团队的AI大模型订阅服务,以Credits为统一计量单位,整合文本与图像生成模型,提供团队管理、数据安全、多工具兼容等核心能力,解决团队零散订阅AI服务的管理混乱、成本失控、数据安全等痛点。本文将从核心定位、套餐详情、计费规则、团队管理、工具兼容、便宜订阅技巧等方面,全面解析Token Plan团队版,帮助企业与团队高效、低成本地使用AI服务。
332 108
|
15天前
|
Linux 程序员 数据格式
【2026最新】Notepad++下载、安装和使用一篇搞定(附中文版安装包)
Notepad++ 是一款免费开源、轻量高效的 Windows 文本编辑器,支持 C/Python/HTML 等 80+ 语言语法高亮、代码折叠、正则替换、编码转换及插件扩展,专为程序员与文本处理用户打造,完美替代系统记事本。(239字)